Skip to content

Commit

Permalink
Updates uses of IndexEntryUpdate to use generics
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Sep 7, 2017
1 parent 4db6eda commit 0cb8cb7
Show file tree
Hide file tree
Showing 57 changed files with 213 additions and 195 deletions.
Expand Up @@ -576,7 +576,7 @@ public void shouldReportNodesThatAreNotIndexed() throws Exception
for ( long nodeId : indexedNodes )
{
NodeUpdates updates = storeView.nodeAsUpdates( nodeId );
for ( IndexEntryUpdate update : updates.forIndexKeys( asList( descriptor ) ) )
for ( IndexEntryUpdate<?> update : updates.forIndexKeys( asList( descriptor ) ) )
{
updater.process( IndexEntryUpdate.remove( nodeId, descriptor, update.values() ) );
}
Expand Down
Expand Up @@ -32,6 +32,8 @@
* Subclasses of this represent events related to property changes due to property or label addition, deletion or
* update.
* This is of use in populating indexes that might be relevant to node label and property combinations.
*
* @param <INDEX_KEY> {@link LabelSchemaSupplier} specifying the schema
*/
public class IndexEntryUpdate<INDEX_KEY extends LabelSchemaSupplier>
{
Expand Down Expand Up @@ -94,7 +96,7 @@ public boolean equals( Object o )
return false;
}

IndexEntryUpdate that = (IndexEntryUpdate) o;
IndexEntryUpdate<?> that = (IndexEntryUpdate<?>) o;

if ( entityId != that.entityId )
{
Expand Down
Expand Up @@ -35,11 +35,15 @@ public interface IndexPopulator
{
/**
* Remove all data in the index and paves the way for populating an index.
*
* @throws IOException on I/O error.
*/
void create() throws IOException;

/**
* Closes and deletes this index.
*
* @throws IOException on I/O error.
*/
void drop() throws IOException;

Expand All @@ -53,6 +57,10 @@ public interface IndexPopulator
* @param updates batch of node property updates that needs to be inserted. Node ids will be retrieved using
* {@link IndexEntryUpdate#getEntityId()} method and property values will be retrieved using
* {@link IndexEntryUpdate#values()} method.
* @throws IndexEntryConflictException if this is a uniqueness index and any of the updates are detected
* to violate that constraint. Implementations may choose to not detect in this call, but instead do one efficient
* pass over the index in {@link #verifyDeferredConstraints(PropertyAccessor)}.
* @throws IOException on I/O error.
*/
void add( Collection<? extends IndexEntryUpdate<?>> updates )
throws IndexEntryConflictException, IOException;
Expand Down Expand Up @@ -89,6 +97,11 @@ void add( Collection<? extends IndexEntryUpdate<?>> updates )
* has been removed and need to be removed from this index as well. Note that this removal needs to be
* applied idempotently.</li>
* </ol>
*
* @param accessor accesses property data if implementation needs to be able look up property values while populating.
* @return an {@link IndexUpdater} which will funnel changes that happen concurrently with index population
* into the population and incorporating them as part of the index population.
* @throws IOException on I/O error.
*/
IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) throws IOException;

Expand All @@ -97,6 +110,12 @@ void add( Collection<? extends IndexEntryUpdate<?>> updates )
* If {@code populationCompletedSuccessfully} is {@code true} then it must mark this index
* as {@link InternalIndexState#ONLINE} so that future invocations of its parent
* {@link SchemaIndexProvider#getInitialState(long, IndexDescriptor)} also returns {@link InternalIndexState#ONLINE}.
*
* @param populationCompletedSuccessfully {@code true} if the index population was successful, where the index should
* be marked as {@link InternalIndexState#ONLINE}, otherwise {@code false} where index should be marked as
* {@link InternalIndexState#FAILED} and the failure, previously handed to this populator using {@link #markAsFailed(String)}
* should be stored and made available for later requests from {@link SchemaIndexProvider#getPopulationFailure(long)}.
* @throws IOException on I/O error.
*/
void close( boolean populationCompletedSuccessfully ) throws IOException;

Expand All @@ -115,15 +134,19 @@ void add( Collection<? extends IndexEntryUpdate<?>> updates )
*
* @param update update to include in sample
*/
void includeSample( IndexEntryUpdate update );
void includeSample( IndexEntryUpdate<?> update );

/**
* Configure specific type of sampling that should be used during index population.
* Depends from type of node scan that is used during index population
*
* @param onlineSampling should online (sampling based on index population and updates) be used
*/
void configureSampling( boolean onlineSampling );

/**
* @return {@link IndexSample} from samples collected by {@link #includeSample(IndexEntryUpdate)} calls.
*/
IndexSample sampleResult();

class Adapter implements IndexPopulator
Expand All @@ -144,6 +167,7 @@ public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws Inde
{
}

@Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
{
return SwallowingIndexUpdater.INSTANCE;
Expand All @@ -160,7 +184,7 @@ public void markAsFailed( String failure )
}

@Override
public void includeSample( IndexEntryUpdate update )
public void includeSample( IndexEntryUpdate<?> update )
{
}

Expand Down
Expand Up @@ -33,7 +33,7 @@
*/
public interface IndexUpdater extends AutoCloseable
{
void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException;
void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException;

@Override
void close() throws IOException, IndexEntryConflictException;
Expand Down
Expand Up @@ -162,7 +162,7 @@ public Future<Void> cancel()
*
* @param update {@link IndexEntryUpdate} to queue.
*/
public void update( IndexEntryUpdate update )
public void update( IndexEntryUpdate<?> update )
{
multiPopulator.queue( update );
}
Expand Down
Expand Up @@ -72,6 +72,7 @@ <FAILURE extends Exception> StoreScan<FAILURE> visitNodes(

void incrementIndexUpdates( long indexId, long updatesDelta );

@SuppressWarnings( "rawtypes" )
StoreScan EMPTY_SCAN = new StoreScan()
{
@Override
Expand Down Expand Up @@ -115,6 +116,7 @@ public Value getPropertyValue( long nodeId, int propertyKeyId ) throws EntityNot
return Values.NO_VALUE;
}

@SuppressWarnings( "unchecked" )
@Override
public <FAILURE extends Exception> StoreScan<FAILURE> visitNodes( int[] labelIds,
IntPredicate propertyKeyIdFilter, Visitor<NodeUpdates,FAILURE> propertyUpdateVisitor,
Expand Down Expand Up @@ -151,6 +153,5 @@ public DoubleLongRegister indexSample( long indexId, DoubleLongRegister output )
public void incrementIndexUpdates( long indexId, long updatesDelta )
{
}

};
}
Expand Up @@ -93,7 +93,7 @@ public class MultipleIndexPopulator implements IndexPopulator

// Concurrency queue since multiple concurrent threads may enqueue updates into it. It is important for this queue
// to have fast #size() method since it might be drained in batches
protected final Queue<IndexEntryUpdate> queue = new LinkedBlockingQueue<>();
protected final Queue<IndexEntryUpdate<?>> queue = new LinkedBlockingQueue<>();

// Populators are added into this list. The same thread adding populators will later call #indexAllNodes.
// Multiple concurrent threads might fail individual populations.
Expand Down Expand Up @@ -187,13 +187,15 @@ public void run() throws IndexPopulationFailedKernelException
*
* @param update {@link IndexEntryUpdate} to queue.
*/
public void queue( IndexEntryUpdate update )
public void queue( IndexEntryUpdate<?> update )
{
queue.add( update );
}

/**
* Called if forced failure from the outside
*
* @param failure index population failure.
*/
public void fail( Throwable failure )
{
Expand Down Expand Up @@ -279,7 +281,7 @@ public void markAsFailed( String failure ) throws IOException
}

@Override
public void includeSample( IndexEntryUpdate update )
public void includeSample( IndexEntryUpdate<?> update )
{
throw new UnsupportedOperationException( "Multiple index populator can't perform index sampling." );
}
Expand Down Expand Up @@ -383,7 +385,7 @@ private void populateFromQueueIfAvailable( long currentlyIndexedNodeId )
do
{
// no need to check for null as nobody else is emptying this queue
IndexEntryUpdate update = queue.poll();
IndexEntryUpdate<?> update = queue.poll();
storeScan.acceptUpdate( updater, update, currentlyIndexedNodeId );
}
while ( !queue.isEmpty() );
Expand Down Expand Up @@ -421,7 +423,7 @@ public static class MultipleIndexUpdater implements IndexUpdater
}

@Override
public void process( IndexEntryUpdate update )
public void process( IndexEntryUpdate<?> update )
{
Pair<IndexPopulation,IndexUpdater> pair = populationsWithUpdaters.get( update.indexKey().schema() );
if ( pair != null )
Expand Down Expand Up @@ -510,7 +512,7 @@ private void flipToFailed( Throwable t )
populator, failure( t ), indexCountsRemover, logProvider ) );
}

private void onUpdate( IndexEntryUpdate update )
private void onUpdate( IndexEntryUpdate<?> update )
{
populator.includeSample( update );
if ( batch( update ) )
Expand Down Expand Up @@ -602,7 +604,7 @@ public void stop()
}

@Override
public void acceptUpdate( MultipleIndexUpdater updater, IndexEntryUpdate update, long currentlyIndexedNodeId )
public void acceptUpdate( MultipleIndexUpdater updater, IndexEntryUpdate<?> update, long currentlyIndexedNodeId )
{
delegate.acceptUpdate( updater, update, currentlyIndexedNodeId );
}
Expand Down
Expand Up @@ -69,7 +69,7 @@ public IndexUpdater newUpdater( final IndexUpdateMode mode )
return new PopulatingIndexUpdater()
{
@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
job.update( update );
}
Expand All @@ -78,7 +78,7 @@ public void process( IndexEntryUpdate update ) throws IOException, IndexEntryCon
return new PopulatingIndexUpdater()
{
@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
throw new IllegalArgumentException( "Unsupported update mode: " + mode );
}
Expand Down
Expand Up @@ -30,7 +30,7 @@ public interface StoreScan<FAILURE extends Exception>

void stop();

void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate update,
void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId );

PopulationProgress getProgress();
Expand Down
Expand Up @@ -76,7 +76,7 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
return new DelegatingIndexUpdater( target.accessor.newUpdater( mode ) )
{
@Override
public void process( IndexEntryUpdate update )
public void process( IndexEntryUpdate<?> update )
throws IOException, IndexEntryConflictException
{
try
Expand Down
Expand Up @@ -35,7 +35,7 @@ public DelegatingIndexUpdater( IndexUpdater delegate )
}

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
delegate.process( update );
}
Expand Down
Expand Up @@ -30,7 +30,7 @@ public final class SwallowingIndexUpdater implements IndexUpdater
public static final IndexUpdater INSTANCE = new SwallowingIndexUpdater();

@Override
public void process( IndexEntryUpdate update )
public void process( IndexEntryUpdate<?> update )
{
// intentionally swallow this update
}
Expand Down
Expand Up @@ -37,10 +37,10 @@
public abstract class UniquePropertyIndexUpdater implements IndexUpdater
{
private final Map<Object, DiffSets<Long>> referenceCount = new HashMap<>();
private final ArrayList<IndexEntryUpdate> updates = new ArrayList<>();
private final ArrayList<IndexEntryUpdate<?>> updates = new ArrayList<>();

@Override
public void process( IndexEntryUpdate update )
public void process( IndexEntryUpdate<?> update )
{
// build uniqueness verification state
switch ( update.updateMode() )
Expand Down Expand Up @@ -70,7 +70,7 @@ public void close() throws IOException, IndexEntryConflictException
flushUpdates( updates );
}

protected abstract void flushUpdates( Iterable<IndexEntryUpdate> updates )
protected abstract void flushUpdates( Iterable<IndexEntryUpdate<?>> updates )
throws IOException, IndexEntryConflictException;

private DiffSets<Long> propertyValueDiffSet( Object value )
Expand Down
Expand Up @@ -41,7 +41,7 @@ public UpdateCountingIndexUpdater( IndexStoreView storeView, long indexId, Index
}

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
delegate.process( update );
updates++;
Expand Down
Expand Up @@ -50,7 +50,7 @@ class NativeNonUniqueSchemaNumberIndexPopulator<KEY extends SchemaNumberKey, VAL
}

@Override
public void includeSample( IndexEntryUpdate update )
public void includeSample( IndexEntryUpdate<?> update )
{
if ( updateSampling )
{
Expand Down
Expand Up @@ -127,7 +127,7 @@ public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) throws IOE
private final Collection<IndexEntryUpdate<?>> updates = new ArrayList<>();

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
assertOpen();
updates.add( update );
Expand Down Expand Up @@ -256,7 +256,7 @@ private static class IndexUpdateApply<KEY extends SchemaNumberKey, VALUE extends
this.conflictDetectingValueMerger = conflictDetectingValueMerger;
}

public void process( IndexEntryUpdate indexEntryUpdate ) throws Exception
public void process( IndexEntryUpdate<?> indexEntryUpdate ) throws Exception
{
NativeSchemaNumberIndexUpdater.processUpdate( treeKey, treeValue, indexEntryUpdate, writer, conflictDetectingValueMerger );
}
Expand Down
Expand Up @@ -59,7 +59,7 @@ NativeSchemaNumberIndexUpdater<KEY,VALUE> initialize( Writer<KEY,VALUE> writer,
}

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
assertOpen();
processUpdate( treeKey, treeValue, update, writer, conflictDetectingValueMerger );
Expand All @@ -84,7 +84,7 @@ private void assertOpen()
}

static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> void processUpdate( KEY treeKey, VALUE treeValue,
IndexEntryUpdate update, Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger )
IndexEntryUpdate<?> update, Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger )
throws IOException, IndexEntryConflictException
{
switch ( update.updateMode() )
Expand Down
Expand Up @@ -43,7 +43,7 @@ class NativeUniqueSchemaNumberIndexPopulator<KEY extends SchemaNumberKey, VALUE
}

@Override
public void includeSample( IndexEntryUpdate update )
public void includeSample( IndexEntryUpdate<?> update )
{
sampler.increment( 1 );
}
Expand Down
Expand Up @@ -122,7 +122,7 @@ public void markAsFailed( String failure ) throws IOException
}

@Override
public void includeSample( IndexEntryUpdate update )
public void includeSample( IndexEntryUpdate<?> update )
{
selector.select( nativePopulator, lucenePopulator, update.values() ).includeSample( update );
}
Expand Down
Expand Up @@ -40,7 +40,7 @@ class FusionIndexUpdater implements IndexUpdater
}

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
switch ( update.updateMode() )
{
Expand Down
Expand Up @@ -35,7 +35,7 @@
/**
* Store scan view that will try to minimize amount of scanned nodes by using label scan store {@link LabelScanStore}
* as a source of known labeled node ids.
* @param <FAILURE>
* @param <FAILURE> type of exception thrown on failure
*/
public class LabelScanViewNodeStoreScan<FAILURE extends Exception> extends StoreViewNodeStoreScan<FAILURE>
{
Expand Down

0 comments on commit 0cb8cb7

Please sign in to comment.