Skip to content

Commit

Permalink
Allow concurrent index drops during forcing
Browse files Browse the repository at this point in the history
Currently index drop operation that will be performed concurrently with checkpoint
can fail index forcing operation.
This PR will change that behaviour and will add additional checks during forcing of indexes state
to allow both operations to succeed.
Case when index can't be forced for some other reason then drop will still fail index forcing.
  • Loading branch information
MishaDemianenko committed Apr 25, 2017
1 parent c5b5d78 commit c7df1b4
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 10 deletions.
Expand Up @@ -178,7 +178,7 @@ private void openCall( String name )
throw new IllegalStateException("Cannot call " + name + "() after index has been closed" );
}
else
throw new IllegalStateException("Cannot call " + name + "() before index has been started" );
throw new IllegalStateException("Cannot call " + name + "() when index state is " + state.get() );
}

private void ensureNoOpenCalls(String name)
Expand Down
Expand Up @@ -74,7 +74,7 @@ public IndexProxy removeIndexProxy( long indexId )
return removedProxy;
}

public void foreachIndexProxy( BiConsumer<Long, IndexProxy> consumer )
public void forEachIndexProxy( BiConsumer<Long, IndexProxy> consumer )
{
for ( Map.Entry<Long, IndexProxy> entry : indexesById.entrySet() )
{
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongSet;
Expand Down Expand Up @@ -255,7 +256,7 @@ public void start() throws Exception
Map<InternalIndexState, List<IndexLogRecord>> indexStates = new EnumMap<>( InternalIndexState.class );

// Find all indexes that are not already online, do not require rebuilding, and create them
indexMap.foreachIndexProxy( ( indexId, proxy ) -> {
indexMap.forEachIndexProxy( ( indexId, proxy ) -> {
InternalIndexState state = proxy.getState();
IndexDescriptor descriptor = proxy.getDescriptor();
indexStates.computeIfAbsent( state, internalIndexState -> new ArrayList<>() )
Expand Down Expand Up @@ -699,17 +700,31 @@ public void validateIndex( long indexId ) throws IndexNotFoundKernelException, C

public void forceAll()
{
for ( IndexProxy index : indexMapRef.getAllIndexProxies() )
indexMapRef.indexMapSnapshot().forEachIndexProxy( forceIndexProxy() );
}

private BiConsumer<Long,IndexProxy> forceIndexProxy()
{
return ( id, indexProxy ) ->
{
try
{
index.force();
indexProxy.force();
}
catch ( IOException e )
catch ( Exception e )
{
throw new UnderlyingStorageException( "Unable to force " + index, e );
try
{
IndexProxy proxy = indexMapRef.getIndexProxy( id );
throw new UnderlyingStorageException( "Unable to force " + proxy, e );
}
catch ( IndexNotFoundKernelException infe )
{
// index was dropped while we where try to flush it, we can continue to flush other indexes
}

}
}
};
}

private void closeAllIndexes()
Expand All @@ -722,7 +737,7 @@ private void closeAllIndexes()
{
indexStopFutures.add( index.close() );
}
catch ( IOException e )
catch ( Exception e )
{
log.error( "Unable to close index", e );
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
Expand All @@ -33,9 +34,11 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -70,8 +73,10 @@
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingController;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingMode;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.IndexRule;
import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
Expand All @@ -85,6 +90,7 @@
import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.AssertableLogProvider.LogMatcherBuilder;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample;
Expand All @@ -109,6 +115,7 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
Expand Down Expand Up @@ -142,6 +149,8 @@ public class IndexingServiceTest
{
@Rule
public final LifeRule life = new LifeRule();
@Rule
public ExpectedException expectedException = ExpectedException.none();

private static final LogMatcherBuilder logMatch = inLog( IndexingService.class );
private static final Runnable DO_NOTHING_CALLBACK = () -> {};
Expand Down Expand Up @@ -1019,6 +1028,58 @@ public void shouldLogIndexStateOutliersOnStart() throws Exception
logProvider.assertNone( logMatch.info( "IndexingService.start: index 3 on :Label3(prop) is ONLINE" ) );
}

@Test
public void flushAllIndexesWhileSomeOfThemDropped() throws IOException
{
IndexMapReference indexMapReference = new IndexMapReference();
IndexMap indexMap = indexMapReference.indexMapSnapshot();

IndexProxy validIndex = mock( IndexProxy.class );
indexMap.putIndexProxy( 1, validIndex );
indexMap.putIndexProxy( 2, validIndex );
IndexProxy deletedIndexProxy = mock( IndexProxy.class );
indexMap.putIndexProxy( 3, deletedIndexProxy );
indexMap.putIndexProxy( 4, validIndex );
indexMap.putIndexProxy( 5, validIndex );

indexMapReference.setIndexMap( indexMap );

doAnswer( invocation ->
{
indexMap.removeIndexProxy( 3 );
throw new RuntimeException( "Index deleted." );
} ).when( deletedIndexProxy ).force();

IndexingService indexingService = createIndexServiceWithCustomIndexMap( indexMapReference );

indexingService.forceAll();
verify( validIndex, times( 4 ) ).force();
}

@Test
public void failForceAllWhenOneOfTheIndexesFailToForce() throws IOException
{
IndexMapReference indexMapReference = new IndexMapReference();
IndexMap indexMap = indexMapReference.indexMapSnapshot();

IndexProxy validIndex = mock( IndexProxy.class );
indexMap.putIndexProxy( 1, validIndex );
indexMap.putIndexProxy( 2, validIndex );
IndexProxy strangeIndexProxy = mock( IndexProxy.class );
indexMap.putIndexProxy( 3, strangeIndexProxy );
indexMap.putIndexProxy( 4, validIndex );
indexMap.putIndexProxy( 5, validIndex );
doThrow( new UncheckedIOException( new IOException( "Can't force" ) ) ).when( strangeIndexProxy ).force();

indexMapReference.setIndexMap( indexMap );

IndexingService indexingService = createIndexServiceWithCustomIndexMap( indexMapReference );

expectedException.expectMessage( "Unable to force" );
expectedException.expect( UnderlyingStorageException.class );
indexingService.forceAll();
}

private static Matcher<? extends Throwable> causedBy( final Throwable exception )
{
return new TypeSafeMatcher<Throwable>()
Expand Down Expand Up @@ -1287,4 +1348,13 @@ private Answer nodeUpdatesAnswer( NodePropertyUpdate... updates )
return null;
};
}

private IndexingService createIndexServiceWithCustomIndexMap( IndexMapReference indexMapReference )
{
return new IndexingService( mock( IndexProxyCreator.class ), mock( SchemaIndexProviderMap.class ),
indexMapReference, mock( IndexStoreView.class ), Collections.emptyList(),
mock( IndexSamplingController.class ), mock( TokenNameLookup.class ),
mock( JobScheduler.class ), mock( Runnable.class ), mock( MultiPopulatorFactory.class ),
NullLogProvider.getInstance(), IndexingService.NO_MONITOR );
}
}
Expand Up @@ -22,6 +22,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +43,7 @@
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.SchemaStore;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.IndexRule;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.storageengine.api.schema.PopulationProgress;
Expand All @@ -59,6 +61,8 @@ public class IndexingServiceIntegrationTest
private static final SchemaIndexProvider.Descriptor indexDescriptor =
LuceneSchemaIndexProviderFactory.PROVIDER_DESCRIPTOR;

@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public EphemeralFileSystemRule fileSystemRule = new EphemeralFileSystemRule();
private GraphDatabaseService database;
Expand Down Expand Up @@ -96,7 +100,7 @@ public void testManualIndexPopulation() throws IOException, IndexNotFoundKernelE
}

@Test
public void testSchemaIndexMatchIndexingService() throws IndexNotFoundKernelException
public void testSchemaIndexMatchIndexingService() throws IndexNotFoundKernelException, IOException
{
try ( Transaction transaction = database.beginTx() )
{
Expand Down Expand Up @@ -124,6 +128,46 @@ public void testSchemaIndexMatchIndexingService() throws IndexNotFoundKernelExce
assertEquals( InternalIndexState.ONLINE, weatherIndex.getState());
}

@Test
public void failForceIndexesWhenOneOfTheIndexesIsBroken() throws Exception
{
String constraintLabelPrefix = "ConstraintLabel";
String constraintPropertyPrefix = "ConstraintProperty";
String indexLabelPrefix = "Label";
String indexPropertyPrefix = "Property";
for ( int i = 0; i < 10; i++ )
{
try ( Transaction transaction = database.beginTx() )
{
database.schema().constraintFor( Label.label( constraintLabelPrefix + i ) )
.assertPropertyIsUnique( constraintPropertyPrefix + i ).create();
database.schema().indexFor( Label.label( indexLabelPrefix + i ) ).on( indexPropertyPrefix + i ).create();
transaction.success();
}
}

try ( Transaction ignored = database.beginTx() )
{
database.schema().awaitIndexesOnline( 1, TimeUnit.MINUTES );
}

IndexingService indexingService = getIndexingService( database );

LabelTokenHolder labelTokenHolder = getLabelTokenHolder( database );
PropertyKeyTokenHolder propertyKeyTokenHolder = getPropertyKeyTokenHolder( database );

int indexLabel7 = labelTokenHolder.getIdByName( indexLabelPrefix + 7 );
int indexProperty7 = propertyKeyTokenHolder.getIdByName( indexPropertyPrefix + 7 );

IndexProxy index = indexingService.getIndexProxy( new IndexDescriptor( indexLabel7, indexProperty7) );

index.drop();

expectedException.expect( UnderlyingStorageException.class );
expectedException.expectMessage( "Unable to force" );
indexingService.forceAll();
}

private PropertyKeyTokenHolder getPropertyKeyTokenHolder( GraphDatabaseService database )
{
return getDependencyResolver( database ).resolveDependency( PropertyKeyTokenHolder.class );
Expand Down

0 comments on commit c7df1b4

Please sign in to comment.