Skip to content

Commit

Permalink
LuceneIndexSampler doesn't prevent index drop
Browse files Browse the repository at this point in the history
Make LuceneIndexSampler closeable and close in OnlineIndexSamplingJob.

Before, TaskController owned by LuceneIndexSampler could be left open
if FusionIndexSampler failed before invoking LuceneIndexSampler.

Close IndexSampler in tests
  • Loading branch information
burqen committed Apr 9, 2019
1 parent 9d9a16a commit b8f70c3
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 41 deletions.
Expand Up @@ -370,4 +370,48 @@ public String toString()
return "{key=" + key + ", version=" + version + "}";
}
}

public static class Adaptor extends IndexProvider
{
protected Adaptor( Descriptor descriptor, int priority, IndexDirectoryStructure.Factory directoryStructureFactory )
{
super( descriptor, priority, directoryStructureFactory );
}

@Override
public IndexPopulator getPopulator( long indexId, SchemaIndexDescriptor descriptor, IndexSamplingConfig samplingConfig )
{
return null;
}

@Override
public IndexAccessor getOnlineAccessor( long indexId, SchemaIndexDescriptor descriptor, IndexSamplingConfig samplingConfig ) throws IOException
{
return null;
}

@Override
public String getPopulationFailure( long indexId, SchemaIndexDescriptor descriptor ) throws IllegalStateException
{
return null;
}

@Override
public InternalIndexState getInitialState( long indexId, SchemaIndexDescriptor descriptor )
{
return null;
}

@Override
public IndexCapability getCapability( SchemaIndexDescriptor schemaIndexDescriptor )
{
return null;
}

@Override
public StoreMigrationParticipant storeMigrationParticipant( FileSystemAbstraction fs, PageCache pageCache )
{
return null;
}
}
}
Expand Up @@ -63,9 +63,9 @@ public void run()
{
try
{
try ( IndexReader reader = indexProxy.newReader() )
try ( IndexReader reader = indexProxy.newReader();
IndexSampler sampler = reader.createSampler() )
{
IndexSampler sampler = reader.createSampler();
IndexSample sample = sampler.sampleIndex();

// check again if the index is online before saving the counts in the store
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.IndexSampler;

import static org.neo4j.io.IOUtils.closeAllSilently;

public class FusionIndexSampler implements IndexSampler
{
private final IndexSampler[] samplers;
Expand Down Expand Up @@ -56,4 +58,10 @@ public static IndexSample combineSamples( IndexSample... samples )
}
return new IndexSample( indexSize, uniqueValues, sampleSize );
}

@Override
public void close()
{
closeAllSilently( samplers );
}
}
Expand Up @@ -29,8 +29,10 @@
/**
* Given a set of values selects a slot to use.
*/
interface SlotSelector
public interface SlotSelector
{
SlotSelector nullInstance = new NullInstance();

int INSTANCE_COUNT = 5;

int UNKNOWN = -1;
Expand Down Expand Up @@ -73,4 +75,18 @@ static void validateSelectorInstances( Object[] instances, int... aliveIndex )
}
}
}

class NullInstance implements SlotSelector
{
@Override
public void validateSatisfied( IndexProvider[] instances )
{ // no-op
}

@Override
public <V> int selectSlot( V[] values, Function<V,ValueGroup> groupOf )
{
throw new UnsupportedOperationException( "NullInstance cannot select a slot for you. Please use the real deal." );
}
}
}
Expand Up @@ -128,4 +128,46 @@ public void distinctValues( IndexProgressor.NodeValueClient client, PropertyAcce
// do nothing
}
};

class Adaptor implements IndexReader
{
@Override
public long countIndexedNodes( long nodeId, Value... propertyValues )
{
return 0;
}

@Override
public IndexSampler createSampler()
{
return null;
}

@Override
public PrimitiveLongResourceIterator query( IndexQuery... predicates ) throws IndexNotApplicableKernelException
{
return null;
}

@Override
public void query( IndexProgressor.NodeValueClient client, IndexOrder indexOrder, IndexQuery... query ) throws IndexNotApplicableKernelException
{
}

@Override
public boolean hasFullValuePrecision( IndexQuery... predicates )
{
return false;
}

@Override
public void distinctValues( IndexProgressor.NodeValueClient client, PropertyAccessor propertyAccessor )
{
}

@Override
public void close()
{
}
}
}
Expand Up @@ -19,12 +19,14 @@
*/
package org.neo4j.storageengine.api.schema;

import java.io.Closeable;

import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;

/**
* Component able to sample schema index.
*/
public interface IndexSampler
public interface IndexSampler extends Closeable
{
IndexSampler EMPTY = IndexSample::new;

Expand All @@ -35,4 +37,9 @@ public interface IndexSampler
* @throws IndexNotFoundKernelException if the index is dropped while sampling
*/
IndexSample sampleIndex() throws IndexNotFoundKernelException;

@Override
default void close()
{ // no-op
}
}
Expand Up @@ -592,10 +592,9 @@ public void shouldSampleIndex() throws Exception
// given
IndexEntryUpdate<SchemaIndexDescriptor>[] updates = layoutUtil.someUpdates();
processAll( updates );
try ( IndexReader reader = accessor.newReader() )
try ( IndexReader reader = accessor.newReader();
IndexSampler sampler = reader.createSampler() )
{
IndexSampler sampler = reader.createSampler();

// when
IndexSample sample = sampler.sampleIndex();

Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.List;

import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.IndexSampler;
Expand Down Expand Up @@ -66,4 +67,10 @@ public IndexSample combine( IndexSample sample1, IndexSample sample2 )
long sampleSize = Math.addExact( sample1.sampleSize(), sample2.sampleSize() );
return new IndexSample( indexSize, uniqueValues, sampleSize );
}

@Override
public void close()
{
IOUtils.closeAllSilently( indexSamplers );
}
}
Expand Up @@ -22,7 +22,6 @@
import org.neo4j.helpers.TaskControl;
import org.neo4j.helpers.TaskCoordinator;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.IndexSampler;

/**
Expand All @@ -38,27 +37,6 @@ abstract class LuceneIndexSampler implements IndexSampler
this.executionTicket = taskControl;
}

/**
* Perform actual sampling.
*
* @return the index sampling result.
* @throws IndexNotFoundKernelException if the index is dropped while sampling.
*/
protected abstract IndexSample performSampling() throws IndexNotFoundKernelException;

@Override
public final IndexSample sampleIndex() throws IndexNotFoundKernelException
{
try
{
return performSampling();
}
finally
{
completeSampling();
}
}

/**
* Check if sampling was canceled.
*
Expand All @@ -72,7 +50,8 @@ void checkCancellation() throws IndexNotFoundKernelException
}
}

private void completeSampling()
@Override
public void close()
{
executionTicket.close();
}
Expand Down
Expand Up @@ -57,7 +57,7 @@ public NonUniqueLuceneIndexSampler( IndexSearcher indexSearcher, TaskControl tas
}

@Override
protected IndexSample performSampling() throws IndexNotFoundKernelException
public IndexSample sampleIndex() throws IndexNotFoundKernelException
{
NonUniqueIndexSampler sampler = new DefaultNonUniqueIndexSampler( indexSamplingConfig.sampleSizeLimit() );
IndexReader indexReader = indexSearcher.getIndexReader();
Expand Down
Expand Up @@ -41,7 +41,7 @@ public UniqueLuceneIndexSampler( IndexSearcher indexSearcher, TaskControl taskCo
}

@Override
protected IndexSample performSampling() throws IndexNotFoundKernelException
public IndexSample sampleIndex() throws IndexNotFoundKernelException
{
UniqueIndexSampler sampler = new UniqueIndexSampler();
sampler.increment( indexSearcher.getIndexReader().numDocs() );
Expand Down
Expand Up @@ -32,14 +32,14 @@
import java.util.List;

import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.impl.schema.LuceneIndexAccessor;
import org.neo4j.kernel.api.impl.schema.LuceneSchemaIndexBuilder;
import org.neo4j.kernel.api.impl.schema.SchemaIndex;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -111,12 +111,12 @@ public void partitionedIndexPopulation() throws Exception
// now index is online and should contain updates data
assertTrue( uniqueIndex.isOnline() );

try ( IndexReader indexReader = indexAccessor.newReader() )
try ( IndexReader indexReader = indexAccessor.newReader();
IndexSampler indexSampler = indexReader.createSampler() )
{
long[] nodes = PrimitiveLongCollections.asArray( indexReader.query( IndexQuery.exists( 1 )) );
assertEquals( affectedNodes, nodes.length );

IndexSampler indexSampler = indexReader.createSampler();
IndexSample sample = indexSampler.sampleIndex();
assertEquals( affectedNodes, sample.indexSize() );
assertEquals( affectedNodes, sample.uniqueValues() );
Expand Down
Expand Up @@ -38,13 +38,13 @@
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.function.IOFunction;
import org.neo4j.helpers.TaskCoordinator;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.impl.index.storage.DirectoryFactory;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexQueryHelper;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -246,9 +246,10 @@ public void shouldStopSamplingWhenIndexIsDropped() throws Exception
return nothing;
}, null, waitingWhileIn( TaskCoordinator.class, "awaitCompletion" ), 3, SECONDS );

try ( IndexReader reader = indexReader /* do not inline! */ )
try ( IndexReader reader = indexReader /* do not inline! */;
IndexSampler sampler = indexSampler /* do not inline! */ )
{
indexSampler.sampleIndex();
sampler.sampleIndex();
fail( "expected exception" );
}
catch ( IndexNotFoundKernelException e )
Expand Down
Expand Up @@ -38,13 +38,13 @@
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.function.IOFunction;
import org.neo4j.helpers.TaskCoordinator;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.impl.index.storage.DirectoryFactory;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexQueryHelper;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -314,9 +314,10 @@ public void shouldStopSamplingWhenIndexIsDropped() throws Exception
return nothing;
}, null, waitingWhileIn( TaskCoordinator.class, "awaitCompletion" ), 3, SECONDS );

try ( IndexReader reader = indexReader /* do not inline! */ )
try ( IndexReader reader = indexReader /* do not inline! */;
IndexSampler sampler = indexSampler /* do not inline! */ )
{
indexSampler.sampleIndex();
sampler.sampleIndex();
fail( "expected exception" );
}
catch ( IndexNotFoundKernelException e )
Expand Down

0 comments on commit b8f70c3

Please sign in to comment.