Skip to content

Commit

Permalink
Coordinates instantiate/close in spatial/temporal indexes
Browse files Browse the repository at this point in the history
With one-to-one native indexes like number or string,
updates or interactions after call to close() is prohibited
and coordinated inside GBPTree. With the spatial/temporal index
structure, which has lazy instantiation of sub-indexes
there was no such coordination. This meant that a call to close()
could race with an instantiation of one or more sub-indexes.
This in turn would result in opened sub-indexes (GBPTree instances)
that would never get closed.

Contractually this is very good to tighten up so that they all
behave the same. Practically this was found in a population scenario
where an index population was cancelled while running.
  • Loading branch information
tinwelint committed Apr 16, 2018
1 parent 0179eb5 commit 03ff814
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 98 deletions.
Expand Up @@ -106,6 +106,7 @@ public void refresh()
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
shutInstantiateCloseLock();
forAll( NativeSchemaIndexAccessor::close, this ); forAll( NativeSchemaIndexAccessor::close, this );
} }


Expand Down
Expand Up @@ -22,8 +22,9 @@
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.values.storable.CoordinateReferenceSystem; import org.neo4j.values.storable.CoordinateReferenceSystem;
Expand All @@ -39,8 +40,9 @@
class SpatialIndexCache<T> implements Iterable<T> class SpatialIndexCache<T> implements Iterable<T>
{ {
private final Factory<T> factory; private final Factory<T> factory;

private ConcurrentHashMap<CoordinateReferenceSystem,T> spatials = new ConcurrentHashMap<>();
private Map<CoordinateReferenceSystem,T> spatials = new ConcurrentHashMap<>(); private final Lock instantiateCloseLock = new ReentrantLock();
private boolean closed;


SpatialIndexCache( Factory<T> factory ) SpatialIndexCache( Factory<T> factory )
{ {
Expand All @@ -56,17 +58,51 @@ class SpatialIndexCache<T> implements Iterable<T>
*/ */
T uncheckedSelect( CoordinateReferenceSystem crs ) T uncheckedSelect( CoordinateReferenceSystem crs )
{ {
return spatials.computeIfAbsent( crs, key -> T existing = spatials.get( crs );
if ( existing != null )
{ {
try return existing;
{ }
return factory.newSpatial( crs );
} // Instantiate from factory. Do this under lock so that we coordinate with any concurrent call to close.
catch ( IOException e ) // Concurrent calls to instantiating parts won't contend with each other since there's only
// a single writer at a time anyway.
acquireInstantiateCloseLock();
try
{
return spatials.computeIfAbsent( crs, key ->
{ {
throw new UncheckedIOException( e ); try
} {
} ); return factory.newSpatial( crs );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
}
finally
{
instantiateCloseLock.unlock();
}
}

private void acquireInstantiateCloseLock()
{
instantiateCloseLock.lock();
if ( closed )
{
instantiateCloseLock.unlock();
throw new IllegalStateException( this + " is already closed" );
}
}

void shutInstantiateCloseLock()
{
acquireInstantiateCloseLock();
closed = true;
instantiateCloseLock.unlock();
} }


/** /**
Expand Down
Expand Up @@ -24,8 +24,8 @@
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.stream.StreamSupport;
import java.util.Map; import java.util.Map;
import java.util.stream.StreamSupport;


import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration; import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -115,6 +115,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
@Override @Override
public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException
{ {
shutInstantiateCloseLock();
for ( NativeSchemaIndexPopulator part : this ) for ( NativeSchemaIndexPopulator part : this )
{ {
part.close( populationCompletedSuccessfully ); part.close( populationCompletedSuccessfully );
Expand Down
Expand Up @@ -96,6 +96,7 @@ public void refresh()
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
shutInstantiateCloseLock();
forAll( NativeSchemaIndexAccessor::close, this ); forAll( NativeSchemaIndexAccessor::close, this );
} }


Expand Down
Expand Up @@ -23,8 +23,11 @@
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.function.ThrowingSupplier;
import org.neo4j.values.storable.ValueGroup; import org.neo4j.values.storable.ValueGroup;


import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.date; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.date;
Expand All @@ -35,7 +38,7 @@
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedTime; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedTime;


/** /**
* Cache for lazily creating parts of the temporal index. Each part is created using the factory * Cache for lazily creating parts of the temporal index. Each getOrCreatePart is created using the factory
* the first time it is selected in a select() query, or the first time it's explicitly * the first time it is selected in a select() query, or the first time it's explicitly
* asked for using e.g. date(). * asked for using e.g. date().
* <p> * <p>
Expand All @@ -58,18 +61,20 @@ enum Offset
} }


private final ConcurrentHashMap<Offset,T> cache = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Offset,T> cache = new ConcurrentHashMap<>();
private final Lock instantiateCloseLock = new ReentrantLock();
private boolean closed;


TemporalIndexCache( Factory<T> factory ) TemporalIndexCache( Factory<T> factory )
{ {
this.factory = factory; this.factory = factory;
} }


/** /**
* Select the part corresponding to the given ValueGroup. Creates the part if needed, * Select the getOrCreatePart corresponding to the given ValueGroup. Creates the getOrCreatePart if needed,
* and rethrows any create time exception as a RuntimeException. * and rethrows any create time exception as a RuntimeException.
* *
* @param valueGroup target value group * @param valueGroup target value group
* @return selected part * @return selected getOrCreatePart
*/ */
T uncheckedSelect( ValueGroup valueGroup ) T uncheckedSelect( ValueGroup valueGroup )
{ {
Expand Down Expand Up @@ -99,31 +104,24 @@ T uncheckedSelect( ValueGroup valueGroup )
} }


/** /**
* Select the part corresponding to the given ValueGroup. Creates the part if needed, * Select the getOrCreatePart corresponding to the given ValueGroup. Creates the getOrCreatePart if needed,
* in which case an exception of type E might be thrown. * in which case an exception of type E might be thrown.
* *
* @param valueGroup target value group * @param valueGroup target value group
* @return selected part * @return selected getOrCreatePart
*/ */
T select( ValueGroup valueGroup ) throws IOException T select( ValueGroup valueGroup ) throws IOException
{ {
try return uncheckedSelect( valueGroup );
{
return uncheckedSelect( valueGroup );
}
catch ( UncheckedIOException e )
{
throw e.getCause();
}
} }


/** /**
* Select the part corresponding to the given ValueGroup, apply function to it and return the result. * Select the getOrCreatePart corresponding to the given ValueGroup, apply function to it and return the result.
* If the part isn't created yet return orElse. * If the getOrCreatePart isn't created yet return orElse.
* *
* @param valueGroup target value group * @param valueGroup target value group
* @param function function to apply to part * @param function function to apply to getOrCreatePart
* @param orElse result to return if part isn't created yet * @param orElse result to return if getOrCreatePart isn't created yet
* @param <RESULT> type of result * @param <RESULT> type of result
* @return the result * @return the result
*/ */
Expand Down Expand Up @@ -157,95 +155,83 @@ <RESULT> RESULT selectOrElse( ValueGroup valueGroup, Function<T,RESULT> function
return cachedValue != null ? function.apply( cachedValue ) : orElse; return cachedValue != null ? function.apply( cachedValue ) : orElse;
} }


private T date() throws UncheckedIOException private void acquireInstantiateCloseLock()
{ {
return cache.computeIfAbsent( date, d -> instantiateCloseLock.lock();
if ( closed )
{ {
try instantiateCloseLock.unlock();
{ throw new IllegalStateException( this + " is already closed" );
return factory.newDate(); }
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
} }


private T localDateTime() throws UncheckedIOException void shutInstantiateCloseLock()
{ {
acquireInstantiateCloseLock();
closed = true;
instantiateCloseLock.unlock();
}


return cache.computeIfAbsent( localDateTime, d -> private T getOrCreatePart( Offset key, ThrowingSupplier<T,IOException> factory ) throws UncheckedIOException
{
T existing = cache.get( key );
if ( existing != null )
{ {
try return existing;
{ }
return factory.newLocalDateTime();
} // Instantiate from factory. Do this under lock so that we coordinate with any concurrent call to close.
catch ( IOException e ) // Concurrent calls to instantiating parts won't contend with each other since there's only
// a single writer at a time anyway.
acquireInstantiateCloseLock();
try
{
return cache.computeIfAbsent( key, k ->
{ {
throw new UncheckedIOException( e ); try
} {
} ); return factory.get();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
}
finally
{
instantiateCloseLock.unlock();
}
}

private T date() throws UncheckedIOException
{
return getOrCreatePart( date, factory::newDate );
}

private T localDateTime() throws UncheckedIOException
{
return getOrCreatePart( localDateTime, factory::newLocalDateTime );
} }


private T zonedDateTime() throws UncheckedIOException private T zonedDateTime() throws UncheckedIOException
{ {
return cache.computeIfAbsent( zonedDateTime, d -> return getOrCreatePart( zonedDateTime, factory::newZonedDateTime );
{
try
{
return factory.newZonedDateTime();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
} }


private T localTime() throws UncheckedIOException private T localTime() throws UncheckedIOException
{ {
return cache.computeIfAbsent( localTime, d -> return getOrCreatePart( localTime, factory::newLocalTime );
{
try
{
return factory.newLocalTime();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
} }


private T zonedTime() throws UncheckedIOException private T zonedTime() throws UncheckedIOException
{ {
return cache.computeIfAbsent( zonedTime, d -> return getOrCreatePart( zonedTime, factory::newZonedTime );
{
try
{
return factory.newZonedTime();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
} }


private T duration() throws UncheckedIOException private T duration() throws UncheckedIOException
{ {
return cache.computeIfAbsent( duration, d -> return getOrCreatePart( duration, factory::newDuration );
{
try
{
return factory.newDuration();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
} );
} }


void loadAll() void loadAll()
Expand Down
Expand Up @@ -24,9 +24,8 @@
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.stream.StreamSupport;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.stream.StreamSupport;


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
Expand Down Expand Up @@ -110,6 +109,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
@Override @Override
public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException
{ {
shutInstantiateCloseLock();
for ( NativeSchemaIndexPopulator part : this ) for ( NativeSchemaIndexPopulator part : this )
{ {
part.close( populationCompletedSuccessfully ); part.close( populationCompletedSuccessfully );
Expand Down

0 comments on commit 03ff814

Please sign in to comment.