Skip to content

Commit

Permalink
Hook in temporal index and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
OliviaYtterbrink authored and fickludd committed Mar 1, 2018
1 parent 79d23e0 commit 22eca84
Show file tree
Hide file tree
Showing 17 changed files with 395 additions and 138 deletions.
Expand Up @@ -33,7 +33,6 @@
import org.neo4j.internal.kernel.api.IndexOrder; import org.neo4j.internal.kernel.api.IndexOrder;
import org.neo4j.internal.kernel.api.IndexQuery; import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.io.IOUtils; import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.exceptions.index.IndexNotApplicableKernelException;
import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.storageengine.api.schema.IndexProgressor; import org.neo4j.storageengine.api.schema.IndexProgressor;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.index.internal.gbptree.Layout; import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector; import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexAccessor;
Expand Down Expand Up @@ -91,9 +92,12 @@ public void close() throws IOException, IndexEntryConflictException
} }


@Override @Override
public void force() throws IOException public void force( IOLimiter ioLimiter ) throws IOException
{ {
forAll( NativeSchemaIndexAccessor::force, this ); for ( NativeSchemaIndexAccessor part : this )
{
part.force( ioLimiter );
}
} }


@Override @Override
Expand Down
Expand Up @@ -43,19 +43,27 @@


class FusionIndexAccessor implements IndexAccessor class FusionIndexAccessor implements IndexAccessor
{ {
private final IndexAccessor nativeAccessor; private final IndexAccessor numberAccessor;
private final IndexAccessor spatialAccessor; private final IndexAccessor spatialAccessor;
private final IndexAccessor temporalAccessor;
private final IndexAccessor luceneAccessor; private final IndexAccessor luceneAccessor;
private final Selector selector; private final Selector selector;
private final long indexId; private final long indexId;
private final IndexDescriptor descriptor; private final IndexDescriptor descriptor;
private final DropAction dropAction; private final DropAction dropAction;


FusionIndexAccessor( IndexAccessor nativeAccessor, IndexAccessor spatialAccessor, IndexAccessor luceneAccessor, FusionIndexAccessor( IndexAccessor numberAccessor,
Selector selector, long indexId, IndexDescriptor descriptor, DropAction dropAction ) IndexAccessor spatialAccessor,
IndexAccessor temporalAccessor,
IndexAccessor luceneAccessor,
Selector selector,
long indexId,
IndexDescriptor descriptor,
DropAction dropAction )
{ {
this.nativeAccessor = nativeAccessor; this.numberAccessor = numberAccessor;
this.spatialAccessor = spatialAccessor; this.spatialAccessor = spatialAccessor;
this.temporalAccessor = temporalAccessor;
this.luceneAccessor = luceneAccessor; this.luceneAccessor = luceneAccessor;
this.selector = selector; this.selector = selector;
this.indexId = indexId; this.indexId = indexId;
Expand All @@ -66,71 +74,99 @@ class FusionIndexAccessor implements IndexAccessor
@Override @Override
public void drop() throws IOException public void drop() throws IOException
{ {
forAll( IndexAccessor::drop, nativeAccessor, spatialAccessor, luceneAccessor ); forAll( IndexAccessor::drop, numberAccessor, spatialAccessor, temporalAccessor, luceneAccessor );
dropAction.drop( indexId ); dropAction.drop( indexId );
} }


@Override @Override
public IndexUpdater newUpdater( IndexUpdateMode mode ) public IndexUpdater newUpdater( IndexUpdateMode mode )
{ {
return new FusionIndexUpdater( nativeAccessor.newUpdater( mode ), spatialAccessor.newUpdater( mode ), luceneAccessor.newUpdater( mode ), selector ); return new FusionIndexUpdater(
numberAccessor.newUpdater( mode ),
spatialAccessor.newUpdater( mode ),
temporalAccessor.newUpdater( mode ),
luceneAccessor.newUpdater( mode ), selector );
} }


@Override @Override
public void force( IOLimiter ioLimiter ) throws IOException public void force( IOLimiter ioLimiter ) throws IOException
{ {
nativeAccessor.force( ioLimiter ); numberAccessor.force( ioLimiter );
spatialAccessor.force( ioLimiter ); spatialAccessor.force( ioLimiter );
temporalAccessor.force( ioLimiter );
luceneAccessor.force( ioLimiter ); luceneAccessor.force( ioLimiter );
} }


@Override @Override
public void refresh() throws IOException public void refresh() throws IOException
{ {
nativeAccessor.refresh(); numberAccessor.refresh();
spatialAccessor.refresh();
temporalAccessor.refresh();
luceneAccessor.refresh(); luceneAccessor.refresh();
} }


@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
forAll( IndexAccessor::close, nativeAccessor, spatialAccessor, luceneAccessor ); forAll( IndexAccessor::close, numberAccessor, spatialAccessor, temporalAccessor, luceneAccessor );
} }


@Override @Override
public IndexReader newReader() public IndexReader newReader()
{ {
return new FusionIndexReader( nativeAccessor.newReader(), spatialAccessor.newReader(), luceneAccessor.newReader(), selector, descriptor ); return new FusionIndexReader(
numberAccessor.newReader(),
spatialAccessor.newReader(),
temporalAccessor.newReader(),
luceneAccessor.newReader(),
selector,
descriptor );
} }


@Override @Override
public BoundedIterable<Long> newAllEntriesReader() public BoundedIterable<Long> newAllEntriesReader()
{ {
BoundedIterable<Long> nativeAllEntries = nativeAccessor.newAllEntriesReader(); BoundedIterable<Long> numberAllEntries = numberAccessor.newAllEntriesReader();
BoundedIterable<Long> spatialAllEntries = spatialAccessor.newAllEntriesReader(); BoundedIterable<Long> spatialAllEntries = spatialAccessor.newAllEntriesReader();
BoundedIterable<Long> temporalAllEntries = temporalAccessor.newAllEntriesReader();
BoundedIterable<Long> luceneAllEntries = luceneAccessor.newAllEntriesReader(); BoundedIterable<Long> luceneAllEntries = luceneAccessor.newAllEntriesReader();
return new BoundedIterable<Long>() return new BoundedIterable<Long>()
{ {
@Override @Override
public long maxCount() public long maxCount()
{ {
long nativeMaxCount = nativeAllEntries.maxCount(); long numberMaxCount = numberAllEntries.maxCount();
long spatialMaxCount = spatialAllEntries.maxCount(); long spatialMaxCount = spatialAllEntries.maxCount();
long temporalMaxCount = temporalAllEntries.maxCount();
long luceneMaxCount = luceneAllEntries.maxCount(); long luceneMaxCount = luceneAllEntries.maxCount();
return nativeMaxCount == UNKNOWN_MAX_COUNT || spatialMaxCount == UNKNOWN_MAX_COUNT || luceneMaxCount == UNKNOWN_MAX_COUNT ? return existsUnknownMaxCount( numberMaxCount, spatialMaxCount, temporalMaxCount, luceneMaxCount ) ?
UNKNOWN_MAX_COUNT : nativeMaxCount + spatialMaxCount + luceneMaxCount; UNKNOWN_MAX_COUNT : numberMaxCount + spatialMaxCount + temporalMaxCount + luceneMaxCount;
} }


private boolean existsUnknownMaxCount( long... maxCounts )
{
for ( long maxCount : maxCounts )
{
if ( maxCount == UNKNOWN_MAX_COUNT )
{
return true;
}
}
return false;
}

@SuppressWarnings( "unchecked" )
@Override @Override
public void close() throws Exception public void close() throws Exception
{ {
forAll( BoundedIterable::close, nativeAllEntries, spatialAllEntries, luceneAllEntries ); forAll( BoundedIterable::close, numberAllEntries, spatialAllEntries, temporalAllEntries, luceneAllEntries );
} }


@Override @Override
public Iterator<Long> iterator() public Iterator<Long> iterator()
{ {
return Iterables.concat( nativeAllEntries, spatialAllEntries, luceneAllEntries ).iterator(); return Iterables.concat( numberAllEntries, spatialAllEntries, temporalAllEntries, luceneAllEntries ).iterator();
} }
}; };
} }
Expand All @@ -139,21 +175,25 @@ public Iterator<Long> iterator()
public ResourceIterator<File> snapshotFiles() throws IOException public ResourceIterator<File> snapshotFiles() throws IOException
{ {
return concatResourceIterators( return concatResourceIterators(
asList( nativeAccessor.snapshotFiles(), spatialAccessor.snapshotFiles(), luceneAccessor.snapshotFiles() ).iterator() ); asList( numberAccessor.snapshotFiles(),
spatialAccessor.snapshotFiles(),
temporalAccessor.snapshotFiles(),
luceneAccessor.snapshotFiles() ).iterator() );
} }


@Override @Override
public void verifyDeferredConstraints( PropertyAccessor propertyAccessor ) public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throws IndexEntryConflictException, IOException throws IndexEntryConflictException, IOException
{ {
nativeAccessor.verifyDeferredConstraints( propertyAccessor ); numberAccessor.verifyDeferredConstraints( propertyAccessor );
spatialAccessor.verifyDeferredConstraints( propertyAccessor ); spatialAccessor.verifyDeferredConstraints( propertyAccessor );
temporalAccessor.verifyDeferredConstraints( propertyAccessor );
luceneAccessor.verifyDeferredConstraints( propertyAccessor ); luceneAccessor.verifyDeferredConstraints( propertyAccessor );
} }


@Override @Override
public boolean isDirty() public boolean isDirty()
{ {
return nativeAccessor.isDirty(); return numberAccessor.isDirty() || spatialAccessor.isDirty() || temporalAccessor.isDirty() || luceneAccessor.isDirty();
} }
} }
Expand Up @@ -37,18 +37,20 @@


class FusionIndexPopulator implements IndexPopulator class FusionIndexPopulator implements IndexPopulator
{ {
private final IndexPopulator nativePopulator; private final IndexPopulator numberPopulator;
private final IndexPopulator spatialPopulator; private final IndexPopulator spatialPopulator;
private final IndexPopulator temporalPopulator;
private final IndexPopulator lucenePopulator; private final IndexPopulator lucenePopulator;
private final Selector selector; private final Selector selector;
private final long indexId; private final long indexId;
private final DropAction dropAction; private final DropAction dropAction;


FusionIndexPopulator( IndexPopulator nativePopulator, IndexPopulator spatialPopulator, FusionIndexPopulator( IndexPopulator numberPopulator, IndexPopulator spatialPopulator, IndexPopulator temporalPopulator,
IndexPopulator lucenePopulator, Selector selector, long indexId, DropAction dropAction ) IndexPopulator lucenePopulator, Selector selector, long indexId, DropAction dropAction )
{ {
this.nativePopulator = nativePopulator; this.numberPopulator = numberPopulator;
this.spatialPopulator = spatialPopulator; this.spatialPopulator = spatialPopulator;
this.temporalPopulator = temporalPopulator;
this.lucenePopulator = lucenePopulator; this.lucenePopulator = lucenePopulator;
this.selector = selector; this.selector = selector;
this.indexId = indexId; this.indexId = indexId;
Expand All @@ -58,15 +60,16 @@ class FusionIndexPopulator implements IndexPopulator
@Override @Override
public void create() throws IOException public void create() throws IOException
{ {
nativePopulator.create(); numberPopulator.create();
spatialPopulator.create(); spatialPopulator.create();
temporalPopulator.create();
lucenePopulator.create(); lucenePopulator.create();
} }


@Override @Override
public void drop() throws IOException public void drop() throws IOException
{ {
forAll( IndexPopulator::drop, nativePopulator, spatialPopulator, lucenePopulator ); forAll( IndexPopulator::drop, numberPopulator, spatialPopulator, temporalPopulator, lucenePopulator );
dropAction.drop( indexId ); dropAction.drop( indexId );
} }


Expand All @@ -75,55 +78,63 @@ public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws Inde
{ {
Collection<IndexEntryUpdate<?>> luceneBatch = new ArrayList<>(); Collection<IndexEntryUpdate<?>> luceneBatch = new ArrayList<>();
Collection<IndexEntryUpdate<?>> spatialBatch = new ArrayList<>(); Collection<IndexEntryUpdate<?>> spatialBatch = new ArrayList<>();
Collection<IndexEntryUpdate<?>> nativeBatch = new ArrayList<>(); Collection<IndexEntryUpdate<?>> temporalBatch = new ArrayList<>();
Collection<IndexEntryUpdate<?>> numberBatch = new ArrayList<>();
for ( IndexEntryUpdate<?> update : updates ) for ( IndexEntryUpdate<?> update : updates )
{ {
selector.select( nativeBatch, spatialBatch, luceneBatch, update.values() ).add( update ); selector.select( numberBatch, spatialBatch, temporalBatch, luceneBatch, update.values() ).add( update );
} }
lucenePopulator.add( luceneBatch ); lucenePopulator.add( luceneBatch );
spatialPopulator.add( spatialBatch ); spatialPopulator.add( spatialBatch );
nativePopulator.add( nativeBatch ); temporalPopulator.add( temporalBatch );
numberPopulator.add( numberBatch );
} }


@Override @Override
public void verifyDeferredConstraints( PropertyAccessor propertyAccessor ) public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
throws IndexEntryConflictException, IOException throws IndexEntryConflictException, IOException
{ {
nativePopulator.verifyDeferredConstraints( propertyAccessor ); numberPopulator.verifyDeferredConstraints( propertyAccessor );
spatialPopulator.verifyDeferredConstraints( propertyAccessor ); spatialPopulator.verifyDeferredConstraints( propertyAccessor );
temporalPopulator.verifyDeferredConstraints( propertyAccessor );
lucenePopulator.verifyDeferredConstraints( propertyAccessor ); lucenePopulator.verifyDeferredConstraints( propertyAccessor );
} }


@Override @Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) throws IOException public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) throws IOException
{ {
return new FusionIndexUpdater( return new FusionIndexUpdater(
nativePopulator.newPopulatingUpdater( accessor ), numberPopulator.newPopulatingUpdater( accessor ),
spatialPopulator.newPopulatingUpdater( accessor ), spatialPopulator.newPopulatingUpdater( accessor ),
temporalPopulator.newPopulatingUpdater( accessor ),
lucenePopulator.newPopulatingUpdater( accessor ), selector ); lucenePopulator.newPopulatingUpdater( accessor ), selector );
} }


@Override @Override
public void close( boolean populationCompletedSuccessfully ) throws IOException public void close( boolean populationCompletedSuccessfully ) throws IOException
{ {
forAll( populator -> populator.close( populationCompletedSuccessfully ), nativePopulator, spatialPopulator, lucenePopulator ); forAll( populator -> populator.close( populationCompletedSuccessfully ), numberPopulator, spatialPopulator, temporalPopulator, lucenePopulator );
} }


@Override @Override
public void markAsFailed( String failure ) throws IOException public void markAsFailed( String failure ) throws IOException
{ {
forAll( populator -> populator.markAsFailed( failure ), nativePopulator, spatialPopulator, lucenePopulator ); forAll( populator -> populator.markAsFailed( failure ), numberPopulator, spatialPopulator, temporalPopulator, lucenePopulator );
} }


@Override @Override
public void includeSample( IndexEntryUpdate<?> update ) public void includeSample( IndexEntryUpdate<?> update )
{ {
selector.select( nativePopulator, spatialPopulator, lucenePopulator, update.values() ).includeSample( update ); selector.select( numberPopulator, spatialPopulator, temporalPopulator, lucenePopulator, update.values() ).includeSample( update );
} }


@Override @Override
public IndexSample sampleResult() public IndexSample sampleResult()
{ {
return combineSamples( nativePopulator.sampleResult(), spatialPopulator.sampleResult() , lucenePopulator.sampleResult() ); return combineSamples(
numberPopulator.sampleResult(),
spatialPopulator.sampleResult(),
temporalPopulator.sampleResult(),
lucenePopulator.sampleResult() );
} }
} }

0 comments on commit 22eca84

Please sign in to comment.