Skip to content

Commit

Permalink
Allow concurrent index drops during indexes forcing operation
Browse files Browse the repository at this point in the history
Currently index drop operation that will be performed concurrently with checkpoint
can fail index forcing operation.
This PR will change that behaviour and will add additional checks during forcing of indexes state
to allow both operations to succeed.
The case when index can't be forced for some other reason then drop will still fail index forcing.

Increase default failure tolerance to 10. Introduce specific feature toggle.
Since 3 was too small and non critical error with indexes caused database health update that require env restart.
  • Loading branch information
MishaDemianenko committed May 15, 2017
1 parent 3a8d8c5 commit fd281bd
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 11 deletions.
Expand Up @@ -189,7 +189,7 @@ private void openCall( String name )
} }
else else
{ {
throw new IllegalStateException( "Cannot call " + name + "() before index has been started" ); throw new IllegalStateException("Cannot call " + name + "() when index state is " + state.get() );
} }
} }


Expand Down
Expand Up @@ -117,7 +117,7 @@ public IndexProxy removeIndexProxy( long indexId )
return removedProxy; return removedProxy;
} }


public void foreachIndexProxy( BiConsumer<Long, IndexProxy> consumer ) public void forEachIndexProxy( BiConsumer<Long, IndexProxy> consumer )
{ {
for ( Map.Entry<Long, IndexProxy> entry : indexesById.entrySet() ) for ( Map.Entry<Long, IndexProxy> entry : indexesById.entrySet() )
{ {
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer;


import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongSet; import org.neo4j.collection.primitive.PrimitiveLongSet;
Expand Down Expand Up @@ -242,7 +243,7 @@ public void start() throws Exception
Map<InternalIndexState, List<IndexLogRecord>> indexStates = new EnumMap<>( InternalIndexState.class ); Map<InternalIndexState, List<IndexLogRecord>> indexStates = new EnumMap<>( InternalIndexState.class );


// Find all indexes that are not already online, do not require rebuilding, and create them // Find all indexes that are not already online, do not require rebuilding, and create them
indexMap.foreachIndexProxy( ( indexId, proxy ) -> indexMap.forEachIndexProxy( ( indexId, proxy ) ->
{ {
InternalIndexState state = proxy.getState(); InternalIndexState state = proxy.getState();
IndexDescriptor descriptor = proxy.getDescriptor(); IndexDescriptor descriptor = proxy.getDescriptor();
Expand Down Expand Up @@ -656,17 +657,31 @@ public void validateIndex( long indexId )


public void forceAll() public void forceAll()
{ {
for ( IndexProxy index : indexMapRef.getAllIndexProxies() ) indexMapRef.indexMapSnapshot().forEachIndexProxy( forceIndexProxy() );
}

private BiConsumer<Long,IndexProxy> forceIndexProxy()
{
return ( id, indexProxy ) ->
{ {
try try
{ {
index.force(); indexProxy.force();
} }
catch ( IOException e ) catch ( Exception e )
{ {
throw new UnderlyingStorageException( "Unable to force " + index, e ); try
{
IndexProxy proxy = indexMapRef.getIndexProxy( id );
throw new UnderlyingStorageException( "Unable to force " + proxy, e );
}
catch ( IndexNotFoundKernelException infe )
{
// index was dropped while we where try to flush it, we can continue to flush other indexes
}

} }
} };
} }


private void closeAllIndexes() private void closeAllIndexes()
Expand All @@ -679,7 +694,7 @@ private void closeAllIndexes()
{ {
indexStopFutures.add( index.close() ); indexStopFutures.add( index.close() );
} }
catch ( IOException e ) catch ( Exception e )
{ {
log.error( "Unable to close index", e ); log.error( "Unable to close index", e );
} }
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint; import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint;
Expand All @@ -38,7 +39,8 @@ public class CheckPointScheduler extends LifecycleAdapter
* The max number of consecutive check point failures that can be tolerated before treating * The max number of consecutive check point failures that can be tolerated before treating
* check point failures more seriously, with a panic. * check point failures more seriously, with a panic.
*/ */
static final int MAX_CONSECUTIVE_FAILURES_TOLERANCE = 3; static final int MAX_CONSECUTIVE_FAILURES_TOLERANCE =
FeatureToggles.getInteger( CheckPointScheduler.class, "failure_tolerance", 10 );


private final CheckPointer checkPointer; private final CheckPointer checkPointer;
private final IOLimiter ioLimiter; private final IOLimiter ioLimiter;
Expand Down
Expand Up @@ -25,16 +25,19 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
Expand Down Expand Up @@ -70,8 +73,10 @@
import org.neo4j.kernel.api.schema.index.IndexDescriptorFactory; import org.neo4j.kernel.api.schema.index.IndexDescriptorFactory;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingController;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingMode; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingMode;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.IndexRule; import org.neo4j.kernel.impl.store.record.IndexRule;
import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant; import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand; import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
Expand All @@ -85,6 +90,7 @@
import org.neo4j.kernel.lifecycle.LifecycleException; import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.AssertableLogProvider.LogMatcherBuilder; import org.neo4j.logging.AssertableLogProvider.LogMatcherBuilder;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.register.Register.DoubleLongRegister; import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.IndexSample;
Expand All @@ -110,6 +116,7 @@
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
Expand Down Expand Up @@ -141,6 +148,8 @@ public class IndexingServiceTest
{ {
@Rule @Rule
public final LifeRule life = new LifeRule(); public final LifeRule life = new LifeRule();
@Rule
public ExpectedException expectedException = ExpectedException.none();


private static final LogMatcherBuilder logMatch = inLog( IndexingService.class ); private static final LogMatcherBuilder logMatch = inLog( IndexingService.class );
private static final Runnable DO_NOTHING_CALLBACK = () -> {}; private static final Runnable DO_NOTHING_CALLBACK = () -> {};
Expand Down Expand Up @@ -1026,6 +1035,67 @@ public void shouldLogIndexStateOutliersOnStart() throws Exception
logProvider.assertNone( logMatch.info( "IndexingService.start: index 3 on :Label3(prop) is ONLINE" ) ); logProvider.assertNone( logMatch.info( "IndexingService.start: index 3 on :Label3(prop) is ONLINE" ) );
} }


@Test
public void flushAllIndexesWhileSomeOfThemDropped() throws IOException
{
IndexMapReference indexMapReference = new IndexMapReference();
IndexMap indexMap = indexMapReference.indexMapSnapshot();

IndexProxy validIndex = createIndexProxyMock();
indexMap.putIndexProxy( 1, validIndex );
indexMap.putIndexProxy( 2, validIndex );
IndexProxy deletedIndexProxy = createIndexProxyMock();
indexMap.putIndexProxy( 3, deletedIndexProxy );
indexMap.putIndexProxy( 4, validIndex );
indexMap.putIndexProxy( 5, validIndex );

indexMapReference.setIndexMap( indexMap );

doAnswer( invocation ->
{
indexMap.removeIndexProxy( 3 );
throw new RuntimeException( "Index deleted." );
} ).when( deletedIndexProxy ).force();

IndexingService indexingService = createIndexServiceWithCustomIndexMap( indexMapReference );

indexingService.forceAll();
verify( validIndex, times( 4 ) ).force();
}

@Test
public void failForceAllWhenOneOfTheIndexesFailToForce() throws IOException
{
IndexMapReference indexMapReference = new IndexMapReference();
IndexMap indexMap = indexMapReference.indexMapSnapshot();

IndexProxy validIndex = createIndexProxyMock();

indexMap.putIndexProxy( 1, validIndex );
indexMap.putIndexProxy( 2, validIndex );
IndexProxy strangeIndexProxy = createIndexProxyMock();
indexMap.putIndexProxy( 3, strangeIndexProxy );
indexMap.putIndexProxy( 4, validIndex );
indexMap.putIndexProxy( 5, validIndex );
doThrow( new UncheckedIOException( new IOException( "Can't force" ) ) ).when( strangeIndexProxy ).force();

indexMapReference.setIndexMap( indexMap );

IndexingService indexingService = createIndexServiceWithCustomIndexMap( indexMapReference );

expectedException.expectMessage( "Unable to force" );
expectedException.expect( UnderlyingStorageException.class );
indexingService.forceAll();
}

private IndexProxy createIndexProxyMock()
{
IndexProxy proxy = mock( IndexProxy.class );
IndexDescriptor descriptor = IndexDescriptorFactory.forLabel( 1, 2 );
when( proxy.getDescriptor() ).thenReturn( descriptor );
return proxy;
}

private static Matcher<? extends Throwable> causedBy( final Throwable exception ) private static Matcher<? extends Throwable> causedBy( final Throwable exception )
{ {
return new TypeSafeMatcher<Throwable>() return new TypeSafeMatcher<Throwable>()
Expand Down Expand Up @@ -1330,4 +1400,13 @@ private IndexRule constraintIndexRule( long ruleId, int labelId, int propertyKey
return IndexRule.constraintIndexRule( return IndexRule.constraintIndexRule(
ruleId, IndexDescriptorFactory.uniqueForLabel( labelId, propertyKeyId ), providerDescriptor, constraintId ); ruleId, IndexDescriptorFactory.uniqueForLabel( labelId, propertyKeyId ), providerDescriptor, constraintId );
} }

private IndexingService createIndexServiceWithCustomIndexMap( IndexMapReference indexMapReference )
{
return new IndexingService( mock( IndexProxyCreator.class ), mock( SchemaIndexProviderMap.class ),
indexMapReference, mock( IndexStoreView.class ), Collections.emptyList(),
mock( IndexSamplingController.class ), mock( TokenNameLookup.class ),
mock( JobScheduler.class ), mock( Runnable.class ), mock( MultiPopulatorFactory.class ),
NullLogProvider.getInstance(), IndexingService.NO_MONITOR );
}
} }
Expand Up @@ -22,6 +22,7 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand All @@ -43,6 +44,7 @@
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.SchemaStore; import org.neo4j.kernel.impl.store.SchemaStore;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.IndexRule; import org.neo4j.kernel.impl.store.record.IndexRule;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.storageengine.api.schema.PopulationProgress; import org.neo4j.storageengine.api.schema.PopulationProgress;
Expand All @@ -60,6 +62,8 @@ public class IndexingServiceIntegrationTest
private static final SchemaIndexProvider.Descriptor indexDescriptor = private static final SchemaIndexProvider.Descriptor indexDescriptor =
LuceneSchemaIndexProviderFactory.PROVIDER_DESCRIPTOR; LuceneSchemaIndexProviderFactory.PROVIDER_DESCRIPTOR;


@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule @Rule
public EphemeralFileSystemRule fileSystemRule = new EphemeralFileSystemRule(); public EphemeralFileSystemRule fileSystemRule = new EphemeralFileSystemRule();
private GraphDatabaseService database; private GraphDatabaseService database;
Expand Down Expand Up @@ -98,7 +102,7 @@ public void testManualIndexPopulation() throws IOException, IndexNotFoundKernelE
} }


@Test @Test
public void testSchemaIndexMatchIndexingService() throws IndexNotFoundKernelException public void testSchemaIndexMatchIndexingService() throws IndexNotFoundKernelException, IOException
{ {
try ( Transaction transaction = database.beginTx() ) try ( Transaction transaction = database.beginTx() )
{ {
Expand Down Expand Up @@ -128,6 +132,46 @@ public void testSchemaIndexMatchIndexingService() throws IndexNotFoundKernelExce
assertEquals( InternalIndexState.ONLINE, weatherIndex.getState()); assertEquals( InternalIndexState.ONLINE, weatherIndex.getState());
} }


@Test
public void failForceIndexesWhenOneOfTheIndexesIsBroken() throws Exception
{
String constraintLabelPrefix = "ConstraintLabel";
String constraintPropertyPrefix = "ConstraintProperty";
String indexLabelPrefix = "Label";
String indexPropertyPrefix = "Property";
for ( int i = 0; i < 10; i++ )
{
try ( Transaction transaction = database.beginTx() )
{
database.schema().constraintFor( Label.label( constraintLabelPrefix + i ) )
.assertPropertyIsUnique( constraintPropertyPrefix + i ).create();
database.schema().indexFor( Label.label( indexLabelPrefix + i ) ).on( indexPropertyPrefix + i ).create();
transaction.success();
}
}

try ( Transaction ignored = database.beginTx() )
{
database.schema().awaitIndexesOnline( 1, TimeUnit.MINUTES );
}

IndexingService indexingService = getIndexingService( database );

LabelTokenHolder labelTokenHolder = getLabelTokenHolder( database );
PropertyKeyTokenHolder propertyKeyTokenHolder = getPropertyKeyTokenHolder( database );

int indexLabel7 = labelTokenHolder.getIdByName( indexLabelPrefix + 7 );
int indexProperty7 = propertyKeyTokenHolder.getIdByName( indexPropertyPrefix + 7 );

IndexProxy index = indexingService.getIndexProxy( IndexDescriptorFactory.forLabel( indexLabel7, indexProperty7).schema() );

index.drop();

expectedException.expect( UnderlyingStorageException.class );
expectedException.expectMessage( "Unable to force" );
indexingService.forceAll();
}

private PropertyKeyTokenHolder getPropertyKeyTokenHolder( GraphDatabaseService database ) private PropertyKeyTokenHolder getPropertyKeyTokenHolder( GraphDatabaseService database )
{ {
return getDependencyResolver( database ).resolveDependency( PropertyKeyTokenHolder.class ); return getDependencyResolver( database ).resolveDependency( PropertyKeyTokenHolder.class );
Expand Down

0 comments on commit fd281bd

Please sign in to comment.