Skip to content

Commit

Permalink
MultiIndexPopulator force flush all populators after store scan is co…
Browse files Browse the repository at this point in the history
…mplete

Needed because MultiIndexPopulator batches populator updates when scanning store
and last batch may not have been applied at end of store scan.
  • Loading branch information
burqen authored and tinwelint committed Aug 29, 2017
1 parent 20987c7 commit de4e57d
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 37 deletions.
Expand Up @@ -36,7 +36,6 @@
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.PopulationProgress;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles; import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;


import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -153,9 +152,10 @@ private void awaitCompletion()
/** /**
* Insert all batched updates into corresponding indexes. * Insert all batched updates into corresponding indexes.
*/ */
private void flushAll() @Override
protected void flushAll()
{ {
populations.forEach( population -> flush( population ) ); populations.forEach( this::flush );
} }


/** /**
Expand Down Expand Up @@ -272,21 +272,19 @@ private int getNumberOfPopulationWorkers()
* *
* @param <E> type of the exception this store scan might get. * @param <E> type of the exception this store scan might get.
*/ */
private class BatchingStoreScan<E extends Exception> implements StoreScan<E> private class BatchingStoreScan<E extends Exception> extends DelegatingStoreScan<E>
{ {
final StoreScan<E> delegate;

BatchingStoreScan( StoreScan<E> delegate ) BatchingStoreScan( StoreScan<E> delegate )
{ {
this.delegate = delegate; super( delegate );
} }


@Override @Override
public void run() throws E public void run() throws E
{ {
try try
{ {
delegate.run(); super.run();
log.info( "Completed node store scan. " + log.info( "Completed node store scan. " +
"Flushing all pending updates." + EOL + BatchingMultipleIndexPopulator.this ); "Flushing all pending updates." + EOL + BatchingMultipleIndexPopulator.this );
flushAll(); flushAll();
Expand All @@ -305,30 +303,5 @@ public void run() throws E
} }
shutdownExecutor( false ); shutdownExecutor( false );
} }

@Override
public void stop()
{
delegate.stop();
}

@Override
public void acceptUpdate( MultipleIndexUpdater updater, IndexEntryUpdate update,
long currentlyIndexedNodeId )
{
delegate.acceptUpdate( updater, update, currentlyIndexedNodeId );
}

@Override
public PopulationProgress getProgress()
{
return delegate.getProgress();
}

@Override
public void configure( Collection<IndexPopulation> populations )
{
delegate.configure( populations );
}
} }
} }
Expand Up @@ -51,6 +51,7 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.PopulationProgress;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles; import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;


import static java.lang.String.format; import static java.lang.String.format;
Expand Down Expand Up @@ -170,7 +171,15 @@ public StoreScan<IndexPopulationFailedKernelException> indexAllNodes()


storeScan = storeView.visitNodes( labelIds, propertyKeyIdFilter, new NodePopulationVisitor(), null, false ); storeScan = storeView.visitNodes( labelIds, propertyKeyIdFilter, new NodePopulationVisitor(), null, false );
storeScan.configure( populations ); storeScan.configure( populations );
return storeScan; return new DelegatingStoreScan<IndexPopulationFailedKernelException>( storeScan )
{
@Override
public void run() throws IndexPopulationFailedKernelException
{
super.run();
flushAll();
}
};
} }


/** /**
Expand Down Expand Up @@ -349,6 +358,21 @@ protected void populateFromQueue( long currentlyIndexedNodeId )
populateFromQueueIfAvailable( currentlyIndexedNodeId ); populateFromQueueIfAvailable( currentlyIndexedNodeId );
} }


protected void flushAll()
{
for ( IndexPopulation population : populations )
{
try
{
population.populator.add( population.takeCurrentBatch() );
}
catch ( Throwable failure )
{
fail( population, failure );
}
}
}

private void populateFromQueueIfAvailable( long currentlyIndexedNodeId ) private void populateFromQueueIfAvailable( long currentlyIndexedNodeId )
{ {
if ( !queue.isEmpty() ) if ( !queue.isEmpty() )
Expand Down Expand Up @@ -568,4 +592,44 @@ private void add( NodeUpdates updates )
} }
} }
} }

protected class DelegatingStoreScan<E extends Exception> implements StoreScan<E>
{
private final StoreScan<E> delegate;

DelegatingStoreScan( StoreScan<E> delegate )
{
this.delegate = delegate;
}

@Override
public void run() throws E
{
delegate.run();
}

@Override
public void stop()
{
delegate.stop();
}

@Override
public void acceptUpdate( MultipleIndexUpdater updater, IndexEntryUpdate update, long currentlyIndexedNodeId )
{
delegate.acceptUpdate( updater, update, currentlyIndexedNodeId );
}

@Override
public PopulationProgress getProgress()
{
return delegate.getProgress();
}

@Override
public void configure( Collection<IndexPopulation> populations )
{
delegate.configure( populations );
}
}
} }
Expand Up @@ -162,7 +162,7 @@ public void shouldPopulateIndexWithOneNode() throws Exception
verify( populator ).create(); verify( populator ).create();
verify( populator ).configureSampling( false ); verify( populator ).configureSampling( false );
verify( populator ).includeSample( update ); verify( populator ).includeSample( update );
verify( populator ).add( any( Collection.class) ); verify( populator, times( 2 ) ).add( any( Collection.class) );
verify( populator ).sampleResult(); verify( populator ).sampleResult();
verify( populator ).close( true ); verify( populator ).close( true );


Expand Down Expand Up @@ -211,7 +211,7 @@ public void shouldPopulateIndexWithASmallDataset() throws Exception
verify( populator ).configureSampling( false ); verify( populator ).configureSampling( false );
verify( populator ).includeSample( update1 ); verify( populator ).includeSample( update1 );
verify( populator ).includeSample( update2 ); verify( populator ).includeSample( update2 );
verify( populator ).add( Matchers.anyCollection() ); verify( populator, times( 2 ) ).add( Matchers.anyCollection() );
verify( populator ).sampleResult(); verify( populator ).sampleResult();
verify( populator ).close( true ); verify( populator ).close( true );


Expand Down
Expand Up @@ -259,7 +259,7 @@ public void shouldDeliverUpdatesThatOccurDuringPopulationToPopulator() throws Ex
InOrder order = inOrder( populator, accessor, updater); InOrder order = inOrder( populator, accessor, updater);
order.verify( populator ).create(); order.verify( populator ).create();
order.verify( populator ).includeSample( add( 1, "value1" ) ); order.verify( populator ).includeSample( add( 1, "value1" ) );
order.verify( populator ).add( any( Collection.class ) ); order.verify( populator, times( 2 ) ).add( any( Collection.class ) );


// invoked from indexAllNodes(), empty because the id we added (2) is bigger than the one we indexed (1) // invoked from indexAllNodes(), empty because the id we added (2) is bigger than the one we indexed (1)
// //
Expand Down

0 comments on commit de4e57d

Please sign in to comment.