From 03ff8142c7b69fca001d66a08ec5bffb16e2d27a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Mon, 16 Apr 2018 21:45:16 +0200 Subject: [PATCH] Coordinates instantiate/close in spatial/temporal indexes With one-to-one native indexes like number or string, updates or interactions after call to close() is prohibited and coordinated inside GBPTree. With the spatial/temporal index structure, which has lazy instantiation of sub-indexes there was no such coordination. This meant that a call to close() could race with an instantiation of one or more sub-indexes. This in turn would result in opened sub-indexes (GBPTree instances) that would never get closed. Contractually this is very good to tighten up so that they all behave the same. Practically this was found in a population scenario where an index population was cancelled while running. --- .../index/schema/SpatialIndexAccessor.java | 1 + .../impl/index/schema/SpatialIndexCache.java | 60 +++++-- .../index/schema/SpatialIndexPopulator.java | 3 +- .../index/schema/TemporalIndexAccessor.java | 1 + .../impl/index/schema/TemporalIndexCache.java | 150 ++++++++---------- .../index/schema/TemporalIndexPopulator.java | 4 +- .../index/schema/SpatialIndexCacheTest.java | 52 +++++- .../index/schema/TemporalIndexCacheTest.java | 48 ++++++ 8 files changed, 221 insertions(+), 98 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexAccessor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexAccessor.java index 3884b5eda565..7246207df693 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexAccessor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexAccessor.java @@ -106,6 +106,7 @@ public void refresh() @Override public void close() throws IOException { + shutInstantiateCloseLock(); forAll( NativeSchemaIndexAccessor::close, this ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCache.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCache.java index 810443324a91..912ffaffa674 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCache.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCache.java @@ -22,8 +22,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.neo4j.values.storable.CoordinateReferenceSystem; @@ -39,8 +40,9 @@ class SpatialIndexCache implements Iterable { private final Factory factory; - - private Map spatials = new ConcurrentHashMap<>(); + private ConcurrentHashMap spatials = new ConcurrentHashMap<>(); + private final Lock instantiateCloseLock = new ReentrantLock(); + private boolean closed; SpatialIndexCache( Factory factory ) { @@ -56,17 +58,51 @@ class SpatialIndexCache implements Iterable */ T uncheckedSelect( CoordinateReferenceSystem crs ) { - return spatials.computeIfAbsent( crs, key -> + T existing = spatials.get( crs ); + if ( existing != null ) { - try - { - return factory.newSpatial( crs ); - } - catch ( IOException e ) + return existing; + } + + // Instantiate from factory. Do this under lock so that we coordinate with any concurrent call to close. + // Concurrent calls to instantiating parts won't contend with each other since there's only + // a single writer at a time anyway. + acquireInstantiateCloseLock(); + try + { + return spatials.computeIfAbsent( crs, key -> { - throw new UncheckedIOException( e ); - } - } ); + try + { + return factory.newSpatial( crs ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } ); + } + finally + { + instantiateCloseLock.unlock(); + } + } + + private void acquireInstantiateCloseLock() + { + instantiateCloseLock.lock(); + if ( closed ) + { + instantiateCloseLock.unlock(); + throw new IllegalStateException( this + " is already closed" ); + } + } + + void shutInstantiateCloseLock() + { + acquireInstantiateCloseLock(); + closed = true; + instantiateCloseLock.unlock(); } /** diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java index c4af647aa998..6c106feb295c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/SpatialIndexPopulator.java @@ -24,8 +24,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.stream.StreamSupport; import java.util.Map; +import java.util.stream.StreamSupport; import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration; import org.neo4j.io.fs.FileSystemAbstraction; @@ -115,6 +115,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) @Override public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException { + shutInstantiateCloseLock(); for ( NativeSchemaIndexPopulator part : this ) { part.close( populationCompletedSuccessfully ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexAccessor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexAccessor.java index b2aab992c592..a680ade4c2ae 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexAccessor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexAccessor.java @@ -96,6 +96,7 @@ public void refresh() @Override public void close() throws IOException { + shutInstantiateCloseLock(); forAll( NativeSchemaIndexAccessor::close, this ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCache.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCache.java index ac6b288ed7c5..eef8350ed5c3 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCache.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCache.java @@ -23,8 +23,11 @@ import java.io.UncheckedIOException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import org.neo4j.function.ThrowingSupplier; import org.neo4j.values.storable.ValueGroup; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.date; @@ -35,7 +38,7 @@ import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedTime; /** - * Cache for lazily creating parts of the temporal index. Each part is created using the factory + * Cache for lazily creating parts of the temporal index. Each getOrCreatePart is created using the factory * the first time it is selected in a select() query, or the first time it's explicitly * asked for using e.g. date(). *

@@ -58,6 +61,8 @@ enum Offset } private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final Lock instantiateCloseLock = new ReentrantLock(); + private boolean closed; TemporalIndexCache( Factory factory ) { @@ -65,11 +70,11 @@ enum Offset } /** - * Select the part corresponding to the given ValueGroup. Creates the part if needed, + * Select the getOrCreatePart corresponding to the given ValueGroup. Creates the getOrCreatePart if needed, * and rethrows any create time exception as a RuntimeException. * * @param valueGroup target value group - * @return selected part + * @return selected getOrCreatePart */ T uncheckedSelect( ValueGroup valueGroup ) { @@ -99,31 +104,24 @@ T uncheckedSelect( ValueGroup valueGroup ) } /** - * Select the part corresponding to the given ValueGroup. Creates the part if needed, + * Select the getOrCreatePart corresponding to the given ValueGroup. Creates the getOrCreatePart if needed, * in which case an exception of type E might be thrown. * * @param valueGroup target value group - * @return selected part + * @return selected getOrCreatePart */ T select( ValueGroup valueGroup ) throws IOException { - try - { - return uncheckedSelect( valueGroup ); - } - catch ( UncheckedIOException e ) - { - throw e.getCause(); - } + return uncheckedSelect( valueGroup ); } /** - * Select the part corresponding to the given ValueGroup, apply function to it and return the result. - * If the part isn't created yet return orElse. + * Select the getOrCreatePart corresponding to the given ValueGroup, apply function to it and return the result. + * If the getOrCreatePart isn't created yet return orElse. * * @param valueGroup target value group - * @param function function to apply to part - * @param orElse result to return if part isn't created yet + * @param function function to apply to getOrCreatePart + * @param orElse result to return if getOrCreatePart isn't created yet * @param type of result * @return the result */ @@ -157,95 +155,83 @@ RESULT selectOrElse( ValueGroup valueGroup, Function function return cachedValue != null ? function.apply( cachedValue ) : orElse; } - private T date() throws UncheckedIOException + private void acquireInstantiateCloseLock() { - return cache.computeIfAbsent( date, d -> + instantiateCloseLock.lock(); + if ( closed ) { - try - { - return factory.newDate(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } ); + instantiateCloseLock.unlock(); + throw new IllegalStateException( this + " is already closed" ); + } } - private T localDateTime() throws UncheckedIOException + void shutInstantiateCloseLock() { + acquireInstantiateCloseLock(); + closed = true; + instantiateCloseLock.unlock(); + } - return cache.computeIfAbsent( localDateTime, d -> + private T getOrCreatePart( Offset key, ThrowingSupplier factory ) throws UncheckedIOException + { + T existing = cache.get( key ); + if ( existing != null ) { - try - { - return factory.newLocalDateTime(); - } - catch ( IOException e ) + return existing; + } + + // Instantiate from factory. Do this under lock so that we coordinate with any concurrent call to close. + // Concurrent calls to instantiating parts won't contend with each other since there's only + // a single writer at a time anyway. + acquireInstantiateCloseLock(); + try + { + return cache.computeIfAbsent( key, k -> { - throw new UncheckedIOException( e ); - } - } ); + try + { + return factory.get(); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } ); + } + finally + { + instantiateCloseLock.unlock(); + } + } + + private T date() throws UncheckedIOException + { + return getOrCreatePart( date, factory::newDate ); + } + + private T localDateTime() throws UncheckedIOException + { + return getOrCreatePart( localDateTime, factory::newLocalDateTime ); } private T zonedDateTime() throws UncheckedIOException { - return cache.computeIfAbsent( zonedDateTime, d -> - { - try - { - return factory.newZonedDateTime(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } ); + return getOrCreatePart( zonedDateTime, factory::newZonedDateTime ); } private T localTime() throws UncheckedIOException { - return cache.computeIfAbsent( localTime, d -> - { - try - { - return factory.newLocalTime(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } ); + return getOrCreatePart( localTime, factory::newLocalTime ); } private T zonedTime() throws UncheckedIOException { - return cache.computeIfAbsent( zonedTime, d -> - { - try - { - return factory.newZonedTime(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } ); + return getOrCreatePart( zonedTime, factory::newZonedTime ); } private T duration() throws UncheckedIOException { - return cache.computeIfAbsent( duration, d -> - { - try - { - return factory.newDuration(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } - } ); + return getOrCreatePart( duration, factory::newDuration ); } void loadAll() diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java index 6b384c99a954..02eb06d93a0c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/TemporalIndexPopulator.java @@ -24,9 +24,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.stream.StreamSupport; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.StreamSupport; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; @@ -110,6 +109,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) @Override public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException { + shutInstantiateCloseLock(); for ( NativeSchemaIndexPopulator part : this ) { part.close( populationCompletedSuccessfully ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCacheTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCacheTest.java index a2fa04de8143..bb3c32967572 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/SpatialIndexCacheTest.java @@ -19,11 +19,15 @@ */ package org.neo4j.kernel.impl.index.schema; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableInt; import org.junit.Test; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -32,11 +36,15 @@ import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.helpers.collection.Iterators; +import org.neo4j.test.Race; import org.neo4j.values.storable.CoordinateReferenceSystem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.neo4j.helpers.collection.Iterables.count; + public class SpatialIndexCacheTest { - @SuppressWarnings( "Duplicates" ) @Test public void stressCache() throws Exception @@ -70,6 +78,48 @@ public void stressCache() throws Exception } } + @Test + public void stressInstantiationWithClose() throws Throwable + { + // given + StringFactory factory = new StringFactory(); + SpatialIndexCache cache = new SpatialIndexCache<>( factory ); + Race race = new Race().withRandomStartDelays(); + MutableInt instantiatedAtClose = new MutableInt(); + race.addContestant( () -> + { + try + { + cache.uncheckedSelect( CoordinateReferenceSystem.WGS84 ); + cache.uncheckedSelect( CoordinateReferenceSystem.Cartesian_3D ); + } + catch ( IllegalStateException e ) + { + // This exception is OK since it may have been closed + } + }, 1 ); + race.addContestant( () -> + { + cache.shutInstantiateCloseLock(); + instantiatedAtClose.setValue( count( cache ) ); + }, 1 ); + + // when + race.go(); + + // then + try + { + cache.uncheckedSelect( CoordinateReferenceSystem.Cartesian ); + fail( "No instantiation after closed" ); + } + catch ( IllegalStateException e ) + { + // good + } + assertEquals( instantiatedAtClose.intValue(), count( cache ) ); + } + private static final CoordinateReferenceSystem[] coordinateReferenceSystems = Iterators.stream( CoordinateReferenceSystem.all().iterator() ).toArray( CoordinateReferenceSystem[]::new ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCacheTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCacheTest.java index 48096f8efae9..eec8fb599ffe 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/TemporalIndexCacheTest.java @@ -19,6 +19,7 @@ */ package org.neo4j.kernel.impl.index.schema; +import org.apache.commons.lang3.mutable.MutableInt; import org.junit.Test; import java.io.IOException; @@ -27,11 +28,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.helpers.collection.Iterables; +import org.neo4j.test.Race; +import org.neo4j.values.storable.CoordinateReferenceSystem; import org.neo4j.values.storable.ValueGroup; import static org.hamcrest.CoreMatchers.equalTo; @@ -39,6 +43,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.neo4j.helpers.collection.Iterables.count; import static org.neo4j.values.storable.ValueGroup.DATE; import static org.neo4j.values.storable.ValueGroup.DURATION; import static org.neo4j.values.storable.ValueGroup.LOCAL_DATE_TIME; @@ -104,6 +110,48 @@ public void stressCache() throws Exception } } + @Test + public void stressInstantiationWithClose() throws Throwable + { + // given + StringFactory factory = new StringFactory(); + TemporalIndexCache cache = new TemporalIndexCache<>( factory ); + Race race = new Race().withRandomStartDelays(); + MutableInt instantiatedAtClose = new MutableInt(); + race.addContestant( () -> + { + try + { + cache.uncheckedSelect( valueGroups[0] ); + cache.uncheckedSelect( valueGroups[1] ); + } + catch ( IllegalStateException e ) + { + // This exception is OK since it may have been closed + } + }, 1 ); + race.addContestant( () -> + { + cache.shutInstantiateCloseLock(); + instantiatedAtClose.setValue( count( cache ) ); + }, 1 ); + + // when + race.go(); + + // then + try + { + cache.uncheckedSelect( valueGroups[2] ); + fail( "No instantiation after closed" ); + } + catch ( IllegalStateException e ) + { + // good + } + assertEquals( instantiatedAtClose.intValue(), count( cache ) ); + } + private static final ValueGroup[] valueGroups = {ZONED_DATE_TIME, LOCAL_DATE_TIME, DATE, ZONED_TIME, LOCAL_TIME, DURATION}; static class CacheStresser extends Thread