Skip to content

Commit

Permalink
Allows for idempotent close calls on temporal/spatial IndexCaches
Browse files Browse the repository at this point in the history
As well as some renaming and comment fixes
  • Loading branch information
tinwelint committed Apr 17, 2018
1 parent d5952c2 commit 1f5f1fe
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 27 deletions.
Expand Up @@ -106,7 +106,7 @@ public void refresh()
@Override
public void close() throws IOException
{
shutInstantiateCloseLock();
closeInstantiateCloseLock();
forAll( NativeSchemaIndexAccessor::close, this );
}

Expand Down
Expand Up @@ -42,6 +42,7 @@ class SpatialIndexCache<T> implements Iterable<T>
private final Factory<T> factory;
private ConcurrentHashMap<CoordinateReferenceSystem,T> spatials = new ConcurrentHashMap<>();
private final Lock instantiateCloseLock = new ReentrantLock();
// guarded by instantiateCloseLock
private boolean closed;

SpatialIndexCache( Factory<T> factory )
Expand All @@ -67,9 +68,10 @@ T uncheckedSelect( CoordinateReferenceSystem crs )
// Instantiate from factory. Do this under lock so that we coordinate with any concurrent call to close.
// Concurrent calls to instantiating parts won't contend with each other since there's only
// a single writer at a time anyway.
acquireInstantiateCloseLock();
instantiateCloseLock.lock();
try
{
assertOpen();
return spatials.computeIfAbsent( crs, key ->
{
try
Expand All @@ -88,19 +90,17 @@ T uncheckedSelect( CoordinateReferenceSystem crs )
}
}

private void acquireInstantiateCloseLock()
protected void assertOpen()
{
instantiateCloseLock.lock();
if ( closed )
{
instantiateCloseLock.unlock();
throw new IllegalStateException( this + " is already closed" );
}
}

void shutInstantiateCloseLock()
void closeInstantiateCloseLock()
{
acquireInstantiateCloseLock();
instantiateCloseLock.lock();
closed = true;
instantiateCloseLock.unlock();
}
Expand Down
Expand Up @@ -115,7 +115,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
@Override
public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException
{
shutInstantiateCloseLock();
closeInstantiateCloseLock();
for ( NativeSchemaIndexPopulator part : this )
{
part.close( populationCompletedSuccessfully );
Expand Down
Expand Up @@ -38,7 +38,7 @@
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedTime;

/**
* Cache for lazily creating parts of the temporal index. Each getOrCreatePart is created using the factory
* Cache for lazily creating parts of the temporal index. Each part is created using the factory
* the first time it is selected in a select() query, or the first time it's explicitly
* asked for using e.g. date().
* <p>
Expand All @@ -62,6 +62,7 @@ enum Offset

private final ConcurrentHashMap<Offset,T> cache = new ConcurrentHashMap<>();
private final Lock instantiateCloseLock = new ReentrantLock();
// guarded by instantiateCloseLock
private boolean closed;

TemporalIndexCache( Factory<T> factory )
Expand All @@ -70,11 +71,11 @@ enum Offset
}

/**
* Select the getOrCreatePart corresponding to the given ValueGroup. Creates the getOrCreatePart if needed,
* Select the path corresponding to the given ValueGroup. Creates the path if needed,
* and rethrows any create time exception as a RuntimeException.
*
* @param valueGroup target value group
* @return selected getOrCreatePart
* @return selected part
*/
T uncheckedSelect( ValueGroup valueGroup )
{
Expand Down Expand Up @@ -104,24 +105,31 @@ T uncheckedSelect( ValueGroup valueGroup )
}

/**
* Select the getOrCreatePart corresponding to the given ValueGroup. Creates the getOrCreatePart if needed,
* Select the part corresponding to the given ValueGroup. Creates the part if needed,
* in which case an exception of type E might be thrown.
*
* @param valueGroup target value group
* @return selected getOrCreatePart
* @return selected part
*/
T select( ValueGroup valueGroup ) throws IOException
{
return uncheckedSelect( valueGroup );
try
{
return uncheckedSelect( valueGroup );
}
catch ( UncheckedIOException e )
{
throw e.getCause();
}
}

/**
* Select the getOrCreatePart corresponding to the given ValueGroup, apply function to it and return the result.
* If the getOrCreatePart isn't created yet return orElse.
* Select the part corresponding to the given ValueGroup, apply function to it and return the result.
* If the part isn't created yet return orElse.
*
* @param valueGroup target value group
* @param function function to apply to getOrCreatePart
* @param orElse result to return if getOrCreatePart isn't created yet
* @param function function to apply to part
* @param orElse result to return if part isn't created yet
* @param <RESULT> type of result
* @return the result
*/
Expand Down Expand Up @@ -155,19 +163,17 @@ <RESULT> RESULT selectOrElse( ValueGroup valueGroup, Function<T,RESULT> function
return cachedValue != null ? function.apply( cachedValue ) : orElse;
}

private void acquireInstantiateCloseLock()
private void assertOpen()
{
instantiateCloseLock.lock();
if ( closed )
{
instantiateCloseLock.unlock();
throw new IllegalStateException( this + " is already closed" );
}
}

void shutInstantiateCloseLock()
{
acquireInstantiateCloseLock();
instantiateCloseLock.lock();
closed = true;
instantiateCloseLock.unlock();
}
Expand All @@ -183,9 +189,10 @@ private T getOrCreatePart( Offset key, ThrowingSupplier<T,IOException> factory )
// Instantiate from factory. Do this under lock so that we coordinate with any concurrent call to close.
// Concurrent calls to instantiating parts won't contend with each other since there's only
// a single writer at a time anyway.
acquireInstantiateCloseLock();
instantiateCloseLock.lock();
try
{
assertOpen();
return cache.computeIfAbsent( key, k ->
{
try
Expand Down
Expand Up @@ -19,15 +19,12 @@
*/
package org.neo4j.kernel.impl.index.schema;

import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.Test;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -100,7 +97,7 @@ public void stressInstantiationWithClose() throws Throwable
}, 1 );
race.addContestant( () ->
{
cache.shutInstantiateCloseLock();
cache.closeInstantiateCloseLock();
instantiatedAtClose.setValue( count( cache ) );
}, 1 );

Expand Down

0 comments on commit 1f5f1fe

Please sign in to comment.