Skip to content

Commit

Permalink
Make TemporalIndexCache use a ConcurrentHashMap
Browse files Browse the repository at this point in the history
  • Loading branch information
sherfert committed Apr 3, 2018
1 parent 26fe33a commit e715c56
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 135 deletions.
Expand Up @@ -47,7 +47,7 @@
import static org.neo4j.helpers.collection.Iterators.concatResourceIterators; import static org.neo4j.helpers.collection.Iterators.concatResourceIterators;
import static org.neo4j.kernel.impl.index.schema.fusion.FusionIndexBase.forAll; import static org.neo4j.kernel.impl.index.schema.fusion.FusionIndexBase.forAll;


class TemporalIndexAccessor extends TemporalIndexCache<TemporalIndexAccessor.PartAccessor<?>, IOException> implements IndexAccessor class TemporalIndexAccessor extends TemporalIndexCache<TemporalIndexAccessor.PartAccessor<?>> implements IndexAccessor
{ {
private final SchemaIndexDescriptor descriptor; private final SchemaIndexDescriptor descriptor;


Expand Down Expand Up @@ -197,7 +197,7 @@ public TemporalIndexPartReader<KEY> newReader()
} }
} }


static class PartFactory implements TemporalIndexCache.Factory<PartAccessor<?>, IOException> static class PartFactory implements TemporalIndexCache.Factory<PartAccessor<?>>
{ {
private final PageCache pageCache; private final PageCache pageCache;
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
Expand Down
Expand Up @@ -19,33 +19,33 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import java.util.Arrays; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Objects; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.values.storable.ValueGroup; import org.neo4j.values.storable.ValueGroup;


import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.date; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.date;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.duration;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.localDateTime; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.localDateTime;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedDateTime;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.localTime; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.localTime;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedDateTime;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedTime; import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.zonedTime;
import static org.neo4j.kernel.impl.index.schema.TemporalIndexCache.Offset.duration;


/** /**
* Cache for lazily creating parts of the temporal index. Each part is created using the factory * Cache for lazily creating parts of the temporal index. Each part is created using the factory
* the first time it is selected in a select() query, or the first time it's explicitly * the first time it is selected in a select() query, or the first time it's explicitly
* asked for using e.g. date(). * asked for using e.g. date().
* * <p>
* Iterating over the cache will return all currently created parts. * Iterating over the cache will return all currently created parts.
* *
* @param <T> Type of parts * @param <T> Type of parts
* @param <E> Type of exception potentially thrown during creation
*/ */
class TemporalIndexCache<T, E extends Exception> implements Iterable<T> class TemporalIndexCache<T> implements Iterable<T>
{ {
private final Factory<T, E> factory; private final Factory<T> factory;


enum Offset enum Offset
{ {
Expand All @@ -57,20 +57,11 @@ enum Offset
duration duration
} }


private final Object dateLock = new Object(); private final ConcurrentHashMap<Offset,T> cache = new ConcurrentHashMap<>();
private final Object localDateTimeLock = new Object();
private final Object zonedDateTimeLock = new Object();
private final Object localTimeLock = new Object();
private final Object zonedTimeLock = new Object();
private final Object durationLock = new Object();


private T[] parts; TemporalIndexCache( Factory<T> factory )

TemporalIndexCache( Factory<T, E> factory )
{ {
this.factory = factory; this.factory = factory;
//noinspection unchecked
this.parts = (T[]) new Object[Offset.values().length];
} }


/** /**
Expand All @@ -81,26 +72,6 @@ enum Offset
* @return selected part * @return selected part
*/ */
T uncheckedSelect( ValueGroup valueGroup ) T uncheckedSelect( ValueGroup valueGroup )
{
try
{
return select( valueGroup );
}
catch ( Exception t )
{
throw new RuntimeException( t );
}
}

/**
* Select the part corresponding to the given ValueGroup. Creates the part if needed,
* in which case an exception of type E might be thrown.
*
* @param valueGroup target value group
* @return selected part
* @throws E exception potentially thrown during creation
*/
T select( ValueGroup valueGroup ) throws E
{ {
switch ( valueGroup ) switch ( valueGroup )
{ {
Expand All @@ -127,6 +98,25 @@ T select( ValueGroup valueGroup ) throws E
} }
} }


/**
* Select the part corresponding to the given ValueGroup. Creates the part if needed,
* in which case an exception of type E might be thrown.
*
* @param valueGroup target value group
* @return selected part
*/
T select( ValueGroup valueGroup ) throws IOException
{
try
{
return uncheckedSelect( valueGroup );
}
catch ( UncheckedIOException e )
{
throw e.getCause();
}
}

/** /**
* Select the part corresponding to the given ValueGroup, apply function to it and return the result. * Select the part corresponding to the given ValueGroup, apply function to it and return the result.
* If the part isn't created yet return orElse. * If the part isn't created yet return orElse.
Expand All @@ -137,103 +127,125 @@ T select( ValueGroup valueGroup ) throws E
* @param <RESULT> type of result * @param <RESULT> type of result
* @return the result * @return the result
*/ */
<RESULT> RESULT selectOrElse( ValueGroup valueGroup, Function<T, RESULT> function, RESULT orElse ) <RESULT> RESULT selectOrElse( ValueGroup valueGroup, Function<T,RESULT> function, RESULT orElse )
{ {
T cachedValue;
switch ( valueGroup ) switch ( valueGroup )
{ {
case DATE: case DATE:
return parts[date.ordinal()] != null ? function.apply( parts[date.ordinal()] ) : orElse; cachedValue = cache.get( date );

break;
case LOCAL_DATE_TIME: case LOCAL_DATE_TIME:
return parts[localDateTime.ordinal()] != null ? function.apply( parts[localDateTime.ordinal()] ) : orElse; cachedValue = cache.get( localDateTime );

break;
case ZONED_DATE_TIME: case ZONED_DATE_TIME:
return parts[zonedDateTime.ordinal()] != null ? function.apply( parts[zonedDateTime.ordinal()] ) : orElse; cachedValue = cache.get( zonedDateTime );

break;
case LOCAL_TIME: case LOCAL_TIME:
return parts[localTime.ordinal()] != null ? function.apply( parts[localTime.ordinal()] ) : orElse; cachedValue = cache.get( localTime );

break;
case ZONED_TIME: case ZONED_TIME:
return parts[zonedTime.ordinal()] != null ? function.apply( parts[zonedTime.ordinal()] ) : orElse; cachedValue = cache.get( zonedTime );

break;
case DURATION: case DURATION:
return parts[duration.ordinal()] != null ? function.apply( parts[duration.ordinal()] ) : orElse; cachedValue = cache.get( duration );

break;
default: default:
throw new IllegalStateException( "Unsupported value group " + valueGroup ); throw new IllegalStateException( "Unsupported value group " + valueGroup );
} }

return cachedValue != null ? function.apply( cachedValue ) : orElse;
} }


T date() throws E private T date() throws UncheckedIOException
{ {
synchronized ( dateLock ) return cache.computeIfAbsent( date, d ->
{ {
if ( parts[date.ordinal()] == null ) try
{ {
parts[date.ordinal()] = factory.newDate(); return factory.newDate();
} }
} catch ( IOException e )
return parts[date.ordinal()]; {
throw new UncheckedIOException( e );
}
} );
} }


T localDateTime() throws E private T localDateTime() throws UncheckedIOException
{ {
synchronized ( localDateTimeLock )
return cache.computeIfAbsent( localDateTime, d ->
{ {
if ( parts[localDateTime.ordinal()] == null ) try
{ {
parts[localDateTime.ordinal()] = factory.newLocalDateTime(); return factory.newLocalDateTime();
} }
} catch ( IOException e )
return parts[localDateTime.ordinal()]; {
throw new UncheckedIOException( e );
}
} );
} }


T zonedDateTime() throws E private T zonedDateTime() throws UncheckedIOException
{ {
synchronized ( zonedDateTimeLock ) return cache.computeIfAbsent( zonedDateTime, d ->
{ {
if ( parts[zonedDateTime.ordinal()] == null ) try
{ {
parts[zonedDateTime.ordinal()] = factory.newZonedDateTime(); return factory.newZonedDateTime();
} }
} catch ( IOException e )
return parts[zonedDateTime.ordinal()]; {
throw new UncheckedIOException( e );
}
} );
} }


T localTime() throws E private T localTime() throws UncheckedIOException
{ {
synchronized ( localTimeLock ) return cache.computeIfAbsent( localTime, d ->
{ {
if ( parts[localTime.ordinal()] == null ) try
{ {
parts[localTime.ordinal()] = factory.newLocalTime(); return factory.newLocalTime();
} }
} catch ( IOException e )
return parts[localTime.ordinal()]; {
throw new UncheckedIOException( e );
}
} );
} }


T zonedTime() throws E private T zonedTime() throws UncheckedIOException
{ {
synchronized ( zonedTimeLock ) return cache.computeIfAbsent( zonedTime, d ->
{ {
if ( parts[zonedTime.ordinal()] == null ) try
{ {
parts[zonedTime.ordinal()] = factory.newZonedTime(); return factory.newZonedTime();
} }
} catch ( IOException e )
return parts[zonedTime.ordinal()]; {
throw new UncheckedIOException( e );
}
} );
} }


T duration() throws E private T duration() throws UncheckedIOException
{ {
synchronized ( durationLock ) return cache.computeIfAbsent( duration, d ->
{ {
if ( parts[duration.ordinal()] == null ) try
{ {
parts[duration.ordinal()] = factory.newDuration(); return factory.newDuration();
} }
} catch ( IOException e )
return parts[duration.ordinal()]; {
throw new UncheckedIOException( e );
}
} );
} }


void loadAll() void loadAll()
Expand All @@ -256,22 +268,26 @@ void loadAll()
@Override @Override
public Iterator<T> iterator() public Iterator<T> iterator()
{ {
return Arrays.stream( parts ).filter( Objects::nonNull ).iterator(); return cache.values().iterator();
} }


/** /**
* Factory used by the TemporalIndexCache to create parts. * Factory used by the TemporalIndexCache to create parts.
* *
* @param <T> Type of parts * @param <T> Type of parts
* @param <E> Type of exception potentially thrown during create
*/ */
interface Factory<T, E extends Exception> interface Factory<T>
{ {
T newDate() throws E; T newDate() throws IOException;
T newLocalDateTime() throws E;
T newZonedDateTime() throws E; T newLocalDateTime() throws IOException;
T newLocalTime() throws E;
T newZonedTime() throws E; T newZonedDateTime() throws IOException;
T newDuration() throws E;
T newLocalTime() throws IOException;

T newZonedTime() throws IOException;

T newDuration() throws IOException;
} }
} }
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


Expand Down Expand Up @@ -63,7 +64,7 @@ Iterable<FileLayout> existing()
return existing; return existing;
} }


<T,E extends Exception> void loadExistingIndexes( TemporalIndexCache<T,E> indexCache ) throws E <T> void loadExistingIndexes( TemporalIndexCache<T> indexCache ) throws IOException
{ {
for ( FileLayout fileLayout : existing() ) for ( FileLayout fileLayout : existing() )
{ {
Expand Down

0 comments on commit e715c56

Please sign in to comment.