Skip to content

Commit

Permalink
Clear schema state on index flip to online
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Jun 15, 2018
1 parent 4479a45 commit e5725d1
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 32 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.logging.LogProvider;
import org.neo4j.util.FeatureToggles;

Expand Down Expand Up @@ -76,10 +77,11 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
*
* @param storeView the view of the store as a visitable of nodes
* @param logProvider the log provider
* @param schemaState the schema state
*/
BatchingMultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider )
BatchingMultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider, SchemaState schemaState )
{
super( storeView, logProvider );
super( storeView, logProvider, schemaState );
this.executor = createThreadPool();
}

Expand All @@ -91,10 +93,12 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
* @param storeView the view of the store as a visitable of nodes
* @param executor the thread pool to use for batched index insertions
* @param logProvider the log provider
* @param schemaState the schema state
*/
BatchingMultipleIndexPopulator( IndexStoreView storeView, ExecutorService executor, LogProvider logProvider )
BatchingMultipleIndexPopulator( IndexStoreView storeView, ExecutorService executor, LogProvider logProvider,
SchemaState schemaState )
{
super( storeView, logProvider );
super( storeView, logProvider, schemaState );
this.executor = executor;
}

Expand Down
Expand Up @@ -43,15 +43,13 @@ public class IndexPopulationJob implements Runnable
private final IndexingService.Monitor monitor;
private final MultipleIndexPopulator multiPopulator;
private final CountDownLatch doneSignal = new CountDownLatch( 1 );
private final SchemaState schemaState;

private volatile StoreScan<IndexPopulationFailedKernelException> storeScan;
private volatile boolean cancelled;

public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor, SchemaState schemaState )
public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor )
{
this.multiPopulator = multiPopulator;
this.schemaState = schemaState;
this.monitor = monitor;
}

Expand Down Expand Up @@ -113,8 +111,6 @@ public void run()
return;
}
multiPopulator.flipAfterPopulation();

schemaState.clear();
}
catch ( Throwable t )
{
Expand Down
Expand Up @@ -678,8 +678,8 @@ public ResourceIterator<File> snapshotIndexFiles() throws IOException

private IndexPopulationJob newIndexPopulationJob()
{
MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider );
return new IndexPopulationJob( multiPopulator, monitor, schemaState );
MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider, schemaState );
return new IndexPopulationJob( multiPopulator, monitor );
}

private void startIndexPopulation( IndexPopulationJob job )
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.logging.LogProvider;

/**
Expand All @@ -35,7 +36,8 @@ private MultiPopulatorFactory()
{
}

public abstract MultipleIndexPopulator create( IndexStoreView storeView, LogProvider logProvider );
public abstract MultipleIndexPopulator create( IndexStoreView storeView, LogProvider logProvider,
SchemaState schemaState );

public static MultiPopulatorFactory forConfig( Config config )
{
Expand All @@ -46,18 +48,20 @@ public static MultiPopulatorFactory forConfig( Config config )
private static class SingleThreadedPopulatorFactory extends MultiPopulatorFactory
{
@Override
public MultipleIndexPopulator create( IndexStoreView storeView, LogProvider logProvider )
public MultipleIndexPopulator create( IndexStoreView storeView, LogProvider logProvider,
SchemaState schemaState )
{
return new MultipleIndexPopulator( storeView, logProvider );
return new MultipleIndexPopulator( storeView, logProvider, schemaState );
}
}

private static class MultiThreadedPopulatorFactory extends MultiPopulatorFactory
{
@Override
public MultipleIndexPopulator create( IndexStoreView storeView, LogProvider logProvider )
public MultipleIndexPopulator create( IndexStoreView storeView, LogProvider logProvider,
SchemaState schemaState )
{
return new BatchingMultipleIndexPopulator( storeView, logProvider );
return new BatchingMultipleIndexPopulator( storeView, logProvider, schemaState );
}
}
}
Expand Up @@ -46,6 +46,7 @@
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.IndexSample;
Expand Down Expand Up @@ -102,13 +103,15 @@ public class MultipleIndexPopulator implements IndexPopulator
private final IndexStoreView storeView;
private final LogProvider logProvider;
protected final Log log;
private final SchemaState schemaState;
private StoreScan<IndexPopulationFailedKernelException> storeScan;

public MultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider )
public MultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider, SchemaState schemaState )
{
this.storeView = storeView;
this.logProvider = logProvider;
this.log = logProvider.getLog( IndexPopulationJob.class );
this.schemaState = schemaState;
}

IndexPopulation addPopulator(
Expand Down Expand Up @@ -576,6 +579,7 @@ void flip() throws FlipFailedKernelException
IndexSample sample = populator.sampleResult();
storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(), sample.indexSize() );
populator.close( true );
schemaState.clear();
return true;
}
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeStore;
Expand Down Expand Up @@ -88,7 +89,8 @@ public void populateFromQueueDoesNothingIfThresholdNotReached() throws Exception
setProperty( QUEUE_THRESHOLD_NAME, 5 );

BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator(
mock( IndexStoreView.class ), immediateExecutor(), NullLogProvider.getInstance() );
mock( IndexStoreView.class ), immediateExecutor(), NullLogProvider.getInstance(),
mock( SchemaState.class ) );

IndexPopulator populator = addPopulator( batchingPopulator, index1 );
IndexUpdater updater = mock( IndexUpdater.class );
Expand Down Expand Up @@ -117,7 +119,7 @@ public void populateFromQueuePopulatesWhenThresholdReached() throws Exception
NeoStoreIndexStoreView storeView =
new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, neoStores );
BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator(
storeView, immediateExecutor(), NullLogProvider.getInstance() );
storeView, immediateExecutor(), NullLogProvider.getInstance(), mock( SchemaState.class ) );

IndexPopulator populator1 = addPopulator( batchingPopulator, index1 );
IndexUpdater updater1 = mock( IndexUpdater.class );
Expand Down Expand Up @@ -152,7 +154,7 @@ public void executorShutdownAfterStoreScanCompletes() throws Exception
when( executor.awaitTermination( anyLong(), any() ) ).thenReturn( true );

BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView,
executor, NullLogProvider.getInstance() );
executor, NullLogProvider.getInstance(), mock( SchemaState.class ) );

StoreScan<IndexPopulationFailedKernelException> storeScan = batchingPopulator.indexAllNodes();
verify( executor, never() ).shutdown();
Expand All @@ -176,7 +178,7 @@ public void executorForcefullyShutdownIfStoreScanFails() throws Exception
when( executor.awaitTermination( anyLong(), any() ) ).thenReturn( true );

BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView,
executor, NullLogProvider.getInstance() );
executor, NullLogProvider.getInstance(), mock( SchemaState.class ) );

StoreScan<IndexPopulationFailedKernelException> storeScan = batchingPopulator.indexAllNodes();
verify( executor, never() ).shutdown();
Expand Down Expand Up @@ -205,7 +207,7 @@ public void pendingBatchesFlushedAfterStoreScan() throws Exception
IndexStoreView storeView = newStoreView( update1, update2, update3, update42 );

BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView,
sameThreadExecutor(), NullLogProvider.getInstance() );
sameThreadExecutor(), NullLogProvider.getInstance(), mock( SchemaState.class ) );

IndexPopulator populator1 = addPopulator( batchingPopulator, index1 );
IndexPopulator populator42 = addPopulator( batchingPopulator, index42 );
Expand All @@ -227,7 +229,7 @@ public void batchIsFlushedWhenThresholdReached() throws Exception
IndexStoreView storeView = newStoreView( update1, update2, update3 );

BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView,
sameThreadExecutor(), NullLogProvider.getInstance() );
sameThreadExecutor(), NullLogProvider.getInstance(), mock( SchemaState.class ) );

IndexPopulator populator = addPopulator( batchingPopulator, index1 );

Expand All @@ -253,7 +255,7 @@ public void populatorMarkedAsFailed() throws Exception
try
{
BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView, executor,
NullLogProvider.getInstance() );
NullLogProvider.getInstance(), mock( SchemaState.class ) );

populator = addPopulator( batchingPopulator, index1 );
List<IndexEntryUpdate<SchemaIndexDescriptor>> expected = forUpdates( index1, update1, update2 );
Expand Down Expand Up @@ -285,7 +287,7 @@ public void populatorMarkedAsFailedAndUpdatesNotAdded() throws Exception
RuntimeException batchFlushError = new RuntimeException( "Batch failed" );

BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView,
sameThreadExecutor(), NullLogProvider.getInstance() );
sameThreadExecutor(), NullLogProvider.getInstance(), mock( SchemaState.class ) );

IndexPopulator populator = addPopulator( batchingPopulator, index1 );
doThrow( batchFlushError ).when( populator ).add( forUpdates( index1, update3, update4 ) );
Expand All @@ -311,7 +313,7 @@ public void shouldApplyBatchesInParallel() throws Exception
IndexStoreView storeView = newStoreView( updates );
ExecutorService executor = sameThreadExecutor();
BatchingMultipleIndexPopulator batchingPopulator = new BatchingMultipleIndexPopulator( storeView,
executor, NullLogProvider.getInstance() );
executor, NullLogProvider.getInstance(), mock( SchemaState.class ) );
addPopulator( batchingPopulator, index1 );

// when
Expand Down
Expand Up @@ -638,8 +638,8 @@ private IndexPopulationJob newIndexPopulationJob( FailedIndexProxyFactory failur
long indexId = 0;
flipper.setFlipTarget( mock( IndexProxyFactory.class ) );

MultipleIndexPopulator multiPopulator = new MultipleIndexPopulator( storeView, logProvider );
IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR, stateHolder );
MultipleIndexPopulator multiPopulator = new MultipleIndexPopulator( storeView, logProvider, stateHolder );
IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR );
job.addPopulator( populator, indexId, new IndexMeta( indexId, descriptor, PROVIDER_DESCRIPTOR, NO_CAPABILITY ),
format( ":%s(%s)", FIRST.name(), name ), flipper, failureDelegateFactory );
return job;
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.internal.kernel.api.IndexCapability;
import org.neo4j.internal.kernel.api.InternalIndexState;
import org.neo4j.kernel.api.exceptions.index.ExceptionDuringFlipKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
Expand All @@ -37,12 +36,13 @@
import org.neo4j.kernel.api.schema.LabelSchemaDescriptor;
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.storageengine.api.schema.PopulationProgress;
import org.neo4j.values.storable.Values;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

public class IndexPopulationTest
{
Expand All @@ -57,7 +57,8 @@ public void mustFlipToFailedIfFailureToApplyLastBatchWhileFlipping() throws Exce
OnlineIndexProxy onlineProxy = onlineIndexProxy( storeView );
FlippableIndexProxy flipper = new FlippableIndexProxy();
flipper.setFlipTarget( () -> onlineProxy );
MultipleIndexPopulator multipleIndexPopulator = new MultipleIndexPopulator( storeView, logProvider );
MultipleIndexPopulator multipleIndexPopulator = new MultipleIndexPopulator( storeView, logProvider,
mock( SchemaState.class ) );
MultipleIndexPopulator.IndexPopulation indexPopulation =
multipleIndexPopulator.addPopulator( populator, 0, dummyMeta(), flipper, t -> failedProxy, "userDescription" );
multipleIndexPopulator.queue( someUpdate() );
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator.IndexPopulation;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.IndexSample;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class MultipleIndexPopulatorTest
private StoreScan storeScan;
@Mock( answer = Answers.RETURNS_MOCKS )
private LogProvider logProvider;
@Mock
private SchemaState schemaState;
@InjectMocks
private MultipleIndexPopulator multipleIndexPopulator;

Expand Down Expand Up @@ -375,6 +378,7 @@ public void testIndexFlip() throws IOException
verify( indexPopulator2 ).close( true );
verify( indexPopulator2 ).sampleResult();
verify( indexStoreView ).replaceIndexCounts( anyLong(), anyLong(), anyLong(), anyLong() );
verify( schemaState ).clear();
}

@Test
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.InlineNodeLabels;
import org.neo4j.kernel.impl.store.NeoStores;
Expand Down Expand Up @@ -85,7 +86,7 @@ public void updateForHigherNodeIgnoredWhenUsingFullNodeStoreScan()

ProcessListenableNeoStoreIndexView
storeView = new ProcessListenableNeoStoreIndexView( LockService.NO_LOCK_SERVICE, neoStores );
MultipleIndexPopulator indexPopulator = new MultipleIndexPopulator( storeView, logProvider );
MultipleIndexPopulator indexPopulator = new MultipleIndexPopulator( storeView, logProvider, mock( SchemaState.class ) );

storeView.setProcessListener( new NodeUpdateProcessListener( indexPopulator ) );

Expand Down

0 comments on commit e5725d1

Please sign in to comment.