Skip to content

Commit

Permalink
Moved methods for sub-instances to InstanceSelector
Browse files Browse the repository at this point in the history
Instead of them being static in FusionIndexBase.
InstanceSelector also got a close method which will prevent
lazy instantiation after being closed.
  • Loading branch information
tinwelint committed May 16, 2018
1 parent 350fe08 commit 09b0928
Show file tree
Hide file tree
Showing 18 changed files with 394 additions and 177 deletions.
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

abstract class IndexPartsCache<KEY,T> implements Iterable<T>
{
final ConcurrentHashMap<KEY,T> cache = new ConcurrentHashMap<>();
final Lock instantiateCloseLock = new ReentrantLock();
// guarded by instantiateCloseLock
private boolean closed;

void assertOpen()
{
if ( closed )
{
throw new IllegalStateException( this + " is already closed" );
}
}

void closeInstantiateCloseLock()
{
instantiateCloseLock.lock();
closed = true;
instantiateCloseLock.unlock();
}

@Override
public Iterator<T> iterator()
{
return cache.values().iterator();
}
}
Expand Up @@ -21,10 +21,6 @@


import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.values.storable.CoordinateReferenceSystem; import org.neo4j.values.storable.CoordinateReferenceSystem;
Expand All @@ -37,13 +33,9 @@
* *
* @param <T> Type of parts * @param <T> Type of parts
*/ */
class SpatialIndexCache<T> implements Iterable<T> class SpatialIndexCache<T> extends IndexPartsCache<CoordinateReferenceSystem,T>
{ {
private final Factory<T> factory; private final Factory<T> factory;
private ConcurrentHashMap<CoordinateReferenceSystem,T> spatials = new ConcurrentHashMap<>();
private final Lock instantiateCloseLock = new ReentrantLock();
// guarded by instantiateCloseLock
private boolean closed;


SpatialIndexCache( Factory<T> factory ) SpatialIndexCache( Factory<T> factory )
{ {
Expand All @@ -59,7 +51,7 @@ class SpatialIndexCache<T> implements Iterable<T>
*/ */
T uncheckedSelect( CoordinateReferenceSystem crs ) T uncheckedSelect( CoordinateReferenceSystem crs )
{ {
T existing = spatials.get( crs ); T existing = cache.get( crs );
if ( existing != null ) if ( existing != null )
{ {
return existing; return existing;
Expand All @@ -72,7 +64,7 @@ T uncheckedSelect( CoordinateReferenceSystem crs )
try try
{ {
assertOpen(); assertOpen();
return spatials.computeIfAbsent( crs, key -> return cache.computeIfAbsent( crs, key ->
{ {
try try
{ {
Expand All @@ -90,21 +82,6 @@ T uncheckedSelect( CoordinateReferenceSystem crs )
} }
} }


protected void assertOpen()
{
if ( closed )
{
throw new IllegalStateException( this + " is already closed" );
}
}

void closeInstantiateCloseLock()
{
instantiateCloseLock.lock();
closed = true;
instantiateCloseLock.unlock();
}

/** /**
* Select the part corresponding to the given CoordinateReferenceSystem. Creates the part if needed, * Select the part corresponding to the given CoordinateReferenceSystem. Creates the part if needed,
* in which case an exception of type E might be thrown. * in which case an exception of type E might be thrown.
Expand Down Expand Up @@ -136,7 +113,7 @@ T select( CoordinateReferenceSystem crs ) throws IOException
*/ */
<RESULT> RESULT selectOrElse( CoordinateReferenceSystem crs, Function<T, RESULT> function, RESULT orElse ) <RESULT> RESULT selectOrElse( CoordinateReferenceSystem crs, Function<T, RESULT> function, RESULT orElse )
{ {
T part = spatials.get( crs ); T part = cache.get( crs );
if ( part == null ) if ( part == null )
{ {
return orElse; return orElse;
Expand All @@ -152,12 +129,6 @@ void loadAll()
} }
} }


@Override
public Iterator<T> iterator()
{
return spatials.values().iterator();
}

/** /**
* Factory used by the SpatialIndexCache to create parts. * Factory used by the SpatialIndexCache to create parts.
* *
Expand Down
Expand Up @@ -96,7 +96,7 @@ public void refresh()
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
shutInstantiateCloseLock(); closeInstantiateCloseLock();
forAll( NativeSchemaIndexAccessor::close, this ); forAll( NativeSchemaIndexAccessor::close, this );
} }


Expand Down
Expand Up @@ -21,10 +21,6 @@


import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.function.ThrowingSupplier; import org.neo4j.function.ThrowingSupplier;
Expand All @@ -46,7 +42,7 @@
* *
* @param <T> Type of parts * @param <T> Type of parts
*/ */
class TemporalIndexCache<T> implements Iterable<T> class TemporalIndexCache<T> extends IndexPartsCache<TemporalIndexCache.Offset,T>
{ {
private final Factory<T> factory; private final Factory<T> factory;


Expand All @@ -60,11 +56,6 @@ enum Offset
duration duration
} }


private final ConcurrentHashMap<Offset,T> cache = new ConcurrentHashMap<>();
private final Lock instantiateCloseLock = new ReentrantLock();
// guarded by instantiateCloseLock
private boolean closed;

TemporalIndexCache( Factory<T> factory ) TemporalIndexCache( Factory<T> factory )
{ {
this.factory = factory; this.factory = factory;
Expand Down Expand Up @@ -163,21 +154,6 @@ <RESULT> RESULT selectOrElse( ValueGroup valueGroup, Function<T,RESULT> function
return cachedValue != null ? function.apply( cachedValue ) : orElse; return cachedValue != null ? function.apply( cachedValue ) : orElse;
} }


private void assertOpen()
{
if ( closed )
{
throw new IllegalStateException( this + " is already closed" );
}
}

void shutInstantiateCloseLock()
{
instantiateCloseLock.lock();
closed = true;
instantiateCloseLock.unlock();
}

private T getOrCreatePart( Offset key, ThrowingSupplier<T,IOException> factory ) throws UncheckedIOException private T getOrCreatePart( Offset key, ThrowingSupplier<T,IOException> factory ) throws UncheckedIOException
{ {
T existing = cache.get( key ); T existing = cache.get( key );
Expand Down Expand Up @@ -258,12 +234,6 @@ void loadAll()
} }
} }


@Override
public Iterator<T> iterator()
{
return cache.values().iterator();
}

/** /**
* Factory used by the TemporalIndexCache to create parts. * Factory used by the TemporalIndexCache to create parts.
* *
Expand Down
Expand Up @@ -109,7 +109,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
@Override @Override
public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException
{ {
shutInstantiateCloseLock(); closeInstantiateCloseLock();
for ( NativeSchemaIndexPopulator part : this ) for ( NativeSchemaIndexPopulator part : this )
{ {
part.close( populationCompletedSuccessfully ); part.close( populationCompletedSuccessfully );
Expand Down
Expand Up @@ -62,7 +62,7 @@ class FusionIndexAccessor extends FusionIndexBase<IndexAccessor> implements Inde
@Override @Override
public void drop() throws IOException public void drop() throws IOException
{ {
forAll( IndexAccessor::drop, instanceSelector ); instanceSelector.forAll( IndexAccessor::drop );
dropAction.drop( indexId ); dropAction.drop( indexId );
} }


Expand All @@ -77,19 +77,19 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
@Override @Override
public void force( IOLimiter ioLimiter ) throws IOException public void force( IOLimiter ioLimiter ) throws IOException
{ {
forAll( accessor -> accessor.force( ioLimiter ), instanceSelector ); instanceSelector.forAll( accessor -> accessor.force( ioLimiter ) );
} }


@Override @Override
public void refresh() throws IOException public void refresh() throws IOException
{ {
forAll( IndexAccessor::refresh, instanceSelector ); instanceSelector.forAll( IndexAccessor::refresh );
} }


@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
forAll( IndexAccessor::close, instanceSelector ); instanceSelector.close( IndexAccessor::close );
} }


@Override @Override
Expand All @@ -103,7 +103,7 @@ public IndexReader newReader()
@Override @Override
public BoundedIterable<Long> newAllEntriesReader() public BoundedIterable<Long> newAllEntriesReader()
{ {
BoundedIterable<Long>[] entries = instancesAs( new BoundedIterable[INSTANCE_COUNT], IndexAccessor::newAllEntriesReader ); BoundedIterable<Long>[] entries = instanceSelector.instancesAs( new BoundedIterable[INSTANCE_COUNT], IndexAccessor::newAllEntriesReader );
return new BoundedIterable<Long>() return new BoundedIterable<Long>()
{ {
@Override @Override
Expand Down Expand Up @@ -149,7 +149,8 @@ public Iterator<Long> iterator()
@Override @Override
public ResourceIterator<File> snapshotFiles() throws IOException public ResourceIterator<File> snapshotFiles() throws IOException
{ {
return concatResourceIterators( iterator( instancesAs( new ResourceIterator[INSTANCE_COUNT], accessor -> accessor.snapshotFiles() ) ) ); return concatResourceIterators(
iterator( instanceSelector.instancesAs( new ResourceIterator[INSTANCE_COUNT], accessor -> accessor.snapshotFiles() ) ) );
} }


@Override @Override
Expand Down
Expand Up @@ -22,16 +22,11 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.collection.primitive.PrimitiveIntCollections;
import org.neo4j.function.ThrowingConsumer; import org.neo4j.function.ThrowingConsumer;
import org.neo4j.function.ThrowingFunction;
import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueGroup; import org.neo4j.values.storable.ValueGroup;


import static org.neo4j.kernel.impl.index.schema.fusion.SlotSelector.INSTANCE_COUNT;

/** /**
* Acting as a simplifier for the multiplexing that is going in inside a fusion index. A fusion index consists of multiple parts, * Acting as a simplifier for the multiplexing that is going in inside a fusion index. A fusion index consists of multiple parts,
* each handling one or more value groups. Each instance, be it a reader, populator or accessor should extend this class * each handling one or more value groups. Each instance, be it a reader, populator or accessor should extend this class
Expand All @@ -52,67 +47,6 @@ public abstract class FusionIndexBase<T>
this.instanceSelector = instanceSelector; this.instanceSelector = instanceSelector;
} }


/**
* Short-hand for calling the static {@link #instancesAs(InstanceSelector, Object[], ThrowingFunction)}, here with the local {@link #instanceSelector}.
*/
<R,E extends Exception> R[] instancesAs( R[] target, ThrowingFunction<T,R,E> converter ) throws E
{
return instancesAs( instanceSelector, target, converter );
}

/**
* Convenience method typically for calling a method on each of the sub-parts of a fusion entity,
* one which creates another instance. All those instances are returned as an array, or actually put into an array
* created by the caller to avoid reflection to instantiate the array.
*
* @param instanceSelector {@link InstanceSelector} to use as the source.
* @param target array to put the created instances into, also returned.
* @param converter {@link ThrowingFunction} which converts from the source to target instance.
* @param <S> type of source instance.
* @param <T> type of target instance.
* @param <E> type of exception that converter may throw.
* @return the target array which was passed in, now populated.
* @throws E exception from converter.
*/
static <S,T,E extends Exception> T[] instancesAs( InstanceSelector<S> instanceSelector, T[] target, ThrowingFunction<S,T,E> converter ) throws E
{
for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{
target[slot] = converter.apply( instanceSelector.select( slot ) );
}
return target;
}

static <T, E extends Exception> void forInstantiated( ThrowingConsumer<T,E> consumer, InstanceSelector<T> instanceSelector ) throws E
{
E exception = null;
for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{
T instance = instanceSelector.getIfInstantiated( slot );
if ( instance != null )
{
exception = consume( exception, consumer, instance );
}
}
if ( exception != null )
{
throw exception;
}
}

public static <T, E extends Exception> void forAll( ThrowingConsumer<T,E> consumer, InstanceSelector<T> instanceSelector ) throws E
{
E exception = null;
for ( int slot = 0; slot < INSTANCE_COUNT; slot++ )
{
exception = consume( exception, consumer, instanceSelector.select( slot ) );
}
if ( exception != null )
{
throw exception;
}
}

/** /**
* See {@link #forAll(ThrowingConsumer, Object[])} * See {@link #forAll(ThrowingConsumer, Object[])}
* *
Expand Down

0 comments on commit 09b0928

Please sign in to comment.