Skip to content

Commit

Permalink
Proper index recovery
Browse files Browse the repository at this point in the history
This commit changes the way recovery works, with the main driving force to
have index updates be replayed as they were during normal transaction
application before the crash or otherwise unclean shutdown.

Previously index recovery was special and worked by recording which nodes
that were changed in recovered transaction followed by a complete reindex
of those nodes in all existing indexes. Besides the downside of requiring
special recovery code it also put on the index the requirement of being able
to efficiently delete index entries about a certain node id, i.e. to be able to
efficiently do lookups on node id. This worked in the Lucene case, but doesn't
work for the native number index, which only keys on the property values.
  Index recovery worked this way because the act of producing index updates
from a transaction in many cases requires reading data from the store.
It would be incorrect and often impossible to read from the store while it was
recovering.

Now this has changed so that indexes participate in and see normal index updates
during recovery. The enabler for this is to first do a reverse recovery where
all the "before" versions of all transactions since the last checkpoint are
applied in reverse order onto the neostore only. This takes the store back in
time to how it looked at the checkpoint and into a consistent state.
From there a forward recovery is performed where indexes participate as they
would in normal transaction application.

Interesting to note is that the changes, and with some additional changes,
has made recovery both faster, due to:

- The previous reindexing in the end was rather slow.
- LockService was used when applying recovered transactions, to no benefit.

... and less memory consuming, due to:

- Lowering of recovery transaction queue size from 10k --> 100, where there
  was no observed performance benefit having the queue so large. The amount
  of memory required to hold 10k large transactions could be gigabytes.
- Memory efficient implementation of reverse transaction cursor.

So recovery is now more straight forward, faster and more memory efficient.
And above all, it works nicely with our native schema indexes.
  • Loading branch information
tinwelint committed Aug 31, 2017
1 parent e9a1b18 commit 1208e3d
Show file tree
Hide file tree
Showing 73 changed files with 2,141 additions and 632 deletions.
16 changes: 16 additions & 0 deletions community/common/src/test/java/org/neo4j/test/Randoms.java
Expand Up @@ -20,8 +20,10 @@
package org.neo4j.test; package org.neo4j.test;


import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;


import static java.lang.Integer.bitCount; import static java.lang.Integer.bitCount;
import static java.lang.Math.abs; import static java.lang.Math.abs;
Expand Down Expand Up @@ -216,6 +218,20 @@ public <T> T among( T[] among )
return among[random.nextInt( among.length )]; return among[random.nextInt( among.length )];
} }


public <T> T among( List<T> among )
{
return among.get( random.nextInt( among.size() ) );
}

public <T> void among( List<T> among, Consumer<T> action )
{
if ( !among.isEmpty() )
{
T item = among( among );
action.accept( item );
}
}

public Number numberPropertyValue() public Number numberPropertyValue()
{ {
int type = random.nextInt( 6 ); int type = random.nextInt( 6 );
Expand Down
12 changes: 12 additions & 0 deletions community/common/src/test/java/org/neo4j/test/rule/RandomRule.java
Expand Up @@ -27,7 +27,9 @@
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.function.Consumer;


import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.test.Randoms; import org.neo4j.test.Randoms;
Expand Down Expand Up @@ -188,6 +190,16 @@ public <T> T among( T[] among )
return randoms.among( among ); return randoms.among( among );
} }


public <T> T among( List<T> among )
{
return randoms.among( among );
}

public <T> void among( List<T> among, Consumer<T> action )
{
randoms.among( among, action );
}

public Number numberPropertyValue() public Number numberPropertyValue()
{ {
return randoms.numberPropertyValue(); return randoms.numberPropertyValue();
Expand Down
28 changes: 25 additions & 3 deletions community/common/src/test/java/org/neo4j/test/rule/RepeatRule.java
Expand Up @@ -28,6 +28,8 @@
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;


import static java.lang.Integer.max;

/** /**
* Set a test to loop a number of times. If you find yourself using this in a production test, you are probably doing * Set a test to loop a number of times. If you find yourself using this in a production test, you are probably doing
* something wrong. * something wrong.
Expand All @@ -46,24 +48,44 @@ public class RepeatRule implements TestRule
int times(); int times();
} }


private final boolean printRepeats;
private final int defaultTimes;

private int count; private int count;


public RepeatRule()
{
this( false, 1 );
}

public RepeatRule( boolean printRepeats, int defaultRepeats )
{
this.printRepeats = printRepeats;
this.defaultTimes = defaultRepeats;
}

private class RepeatStatement extends Statement private class RepeatStatement extends Statement
{ {
private final int times; private final int times;
private final Statement statement; private final Statement statement;
private final String testName;


private RepeatStatement( int times, Statement statement ) private RepeatStatement( int times, Statement statement, Description testDescription )
{ {
this.times = times; this.times = times;
this.statement = statement; this.statement = statement;
this.testName = testDescription.getDisplayName();
} }


@Override @Override
public void evaluate() throws Throwable public void evaluate() throws Throwable
{ {
for ( count = 0; count < times; count++ ) for ( count = 0; count < times; count++ )
{ {
if ( printRepeats )
{
System.out.println( testName + " iteration " + (count + 1) + "/" + times );
}
statement.evaluate(); statement.evaluate();
} }
} }
Expand All @@ -73,9 +95,9 @@ public void evaluate() throws Throwable
public Statement apply( Statement base, Description description ) public Statement apply( Statement base, Description description )
{ {
Repeat repeat = description.getAnnotation( Repeat.class ); Repeat repeat = description.getAnnotation( Repeat.class );
if ( repeat != null ) if ( repeat != null || defaultTimes > 1 )
{ {
return new RepeatStatement( repeat.times(), base ); return new RepeatStatement( max( repeat != null ? repeat.times() : 1, defaultTimes ), base, description );
} }
return base; return base;
} }
Expand Down
Expand Up @@ -74,12 +74,15 @@
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate; import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory; import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptorFactory; import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptorFactory;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.annotations.Documented; import org.neo4j.kernel.impl.annotations.Documented;
import org.neo4j.kernel.impl.api.KernelStatement; import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode; import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.NodeUpdates;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.AbstractDynamicStore; import org.neo4j.kernel.impl.store.AbstractDynamicStore;
import org.neo4j.kernel.impl.store.DynamicRecordAllocator; import org.neo4j.kernel.impl.store.DynamicRecordAllocator;
import org.neo4j.kernel.impl.store.NodeLabelsField; import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand All @@ -100,6 +103,7 @@
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.store.record.RelationshipTypeTokenRecord; import org.neo4j.kernel.impl.store.record.RelationshipTypeTokenRecord;
import org.neo4j.kernel.impl.transaction.state.storeview.NeoStoreIndexStoreView;
import org.neo4j.kernel.impl.util.Bits; import org.neo4j.kernel.impl.util.Bits;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.FormattedLog; import org.neo4j.logging.FormattedLog;
Expand Down Expand Up @@ -558,15 +562,26 @@ public void shouldReportNodesThatAreNotIndexed() throws Exception
IndexSamplingConfig samplingConfig = new IndexSamplingConfig( Config.defaults() ); IndexSamplingConfig samplingConfig = new IndexSamplingConfig( Config.defaults() );
Iterator<IndexRule> indexRuleIterator = Iterator<IndexRule> indexRuleIterator =
new SchemaStorage( fixture.directStoreAccess().nativeStores().getSchemaStore() ).indexesGetAll(); new SchemaStorage( fixture.directStoreAccess().nativeStores().getSchemaStore() ).indexesGetAll();
NeoStoreIndexStoreView storeView = new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE,
fixture.directStoreAccess().nativeStores().getRawNeoStores() );
while ( indexRuleIterator.hasNext() ) while ( indexRuleIterator.hasNext() )
{ {
IndexRule indexRule = indexRuleIterator.next(); IndexRule indexRule = indexRuleIterator.next();
IndexDescriptor descriptor = indexRule.getIndexDescriptor();
IndexAccessor accessor = fixture.directStoreAccess().indexes(). IndexAccessor accessor = fixture.directStoreAccess().indexes().
apply( indexRule.getProviderDescriptor() ).getOnlineAccessor( apply( indexRule.getProviderDescriptor() ).getOnlineAccessor(
indexRule.getId(), indexRule.getIndexDescriptor(), samplingConfig ); indexRule.getId(), descriptor, samplingConfig );
IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE ); try ( IndexUpdater updater = accessor.newUpdater( IndexUpdateMode.ONLINE ) )
updater.remove( asPrimitiveLongSet( indexedNodes ) ); {
updater.close(); for ( long nodeId : indexedNodes )
{
NodeUpdates updates = storeView.nodeAsUpdates( nodeId );
for ( IndexEntryUpdate update : updates.forIndexKeys( asList( descriptor ) ) )
{
updater.process( IndexEntryUpdate.remove( nodeId, descriptor, update.values() ) );
}
}
}
accessor.force(); accessor.force();
accessor.close(); accessor.close();
} }
Expand Down
10 changes: 10 additions & 0 deletions community/kernel/src/main/java/org/neo4j/helpers/ArrayUtil.java
Expand Up @@ -458,6 +458,16 @@ public static <T> T[] without( T[] source, T... toRemove )
return length == result.length ? result : Arrays.copyOf( result, length ); return length == result.length ? result : Arrays.copyOf( result, length );
} }


public static <T> void reverse( T[] array )
{
for ( int low = 0, high = array.length - 1; high - low > 0; low++, high-- )
{
T lowItem = array[low];
array[low] = array[high];
array[high] = lowItem;
}
}

private ArrayUtil() private ArrayUtil()
{ // No instances allowed { // No instances allowed
} }
Expand Down
Expand Up @@ -21,7 +21,6 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;


/** /**
Expand All @@ -38,6 +37,4 @@ public interface IndexUpdater extends AutoCloseable


@Override @Override
void close() throws IOException, IndexEntryConflictException; void close() throws IOException, IndexEntryConflictException;

void remove( PrimitiveLongSet nodeIds ) throws IOException;
} }
Expand Up @@ -142,6 +142,11 @@ protected SchemaIndexProvider( Descriptor descriptor, int priority )
this.providerDescriptor = descriptor; this.providerDescriptor = descriptor;
} }


public int priority()
{
return this.priority;
}

/** /**
* Used for initially populating a created index, using batch insertion. * Used for initially populating a created index, using batch insertion.
*/ */
Expand Down
Expand Up @@ -57,6 +57,7 @@ public class FlippableIndexProxy implements IndexProxy
// But it turns out that that may not be the case. F.ex. ReentrantReadWriteLock // But it turns out that that may not be the case. F.ex. ReentrantReadWriteLock
// code uses unsafe compareAndSwap that sort of circumvents an equivalent of a volatile read. // code uses unsafe compareAndSwap that sort of circumvents an equivalent of a volatile read.
private volatile IndexProxy delegate; private volatile IndexProxy delegate;
private boolean started;


public FlippableIndexProxy() public FlippableIndexProxy()
{ {
Expand All @@ -75,6 +76,7 @@ public void start() throws IOException
try try
{ {
delegate.start(); delegate.start();
started = true;
} }
finally finally
{ {
Expand Down Expand Up @@ -402,6 +404,10 @@ public void flip( Callable<Void> actionDuringFlip, FailedIndexProxyFactory failu
{ {
actionDuringFlip.call(); actionDuringFlip.call();
this.delegate = flipTarget.create(); this.delegate = flipTarget.create();
if ( started )
{
this.delegate.start();
}
} }
catch ( Exception e ) catch ( Exception e )
{ {
Expand Down
Expand Up @@ -26,16 +26,12 @@
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;


import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.TokenNameLookup; import org.neo4j.kernel.api.TokenNameLookup;
Expand Down Expand Up @@ -100,7 +96,6 @@ public class IndexingService extends LifecycleAdapter implements IndexingUpdateS
private final MultiPopulatorFactory multiPopulatorFactory; private final MultiPopulatorFactory multiPopulatorFactory;
private final LogProvider logProvider; private final LogProvider logProvider;
private final Monitor monitor; private final Monitor monitor;
private final PrimitiveLongSet recoveredNodeIds = Primitive.longSet( 20 );
private final JobScheduler scheduler; private final JobScheduler scheduler;
private final SchemaState schemaState; private final SchemaState schemaState;


Expand All @@ -114,12 +109,6 @@ enum State


public interface Monitor public interface Monitor
{ {
void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds );

void applyingRecoveredUpdate( IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate );

void recoveredUpdatesApplied();

void populationCompleteOn( IndexDescriptor descriptor ); void populationCompleteOn( IndexDescriptor descriptor );


void indexPopulationScanComplete(); void indexPopulationScanComplete();
Expand All @@ -129,22 +118,6 @@ public interface Monitor


public static class MonitorAdapter implements Monitor public static class MonitorAdapter implements Monitor
{ {
@Override
public void recoveredUpdatesApplied()
{ // Do nothing
}

@Override
public void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds )
{ // Do nothing
}

@Override
public void applyingRecoveredUpdate( IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate )
{
// empty
}

@Override @Override
public void populationCompleteOn( IndexDescriptor descriptor ) public void populationCompleteOn( IndexDescriptor descriptor )
{ // Do nothing { // Do nothing
Expand Down Expand Up @@ -246,7 +219,6 @@ public void start() throws Exception
{ {
state = State.STARTING; state = State.STARTING;


applyRecoveredUpdates();
IndexMap indexMap = indexMapRef.indexMapSnapshot(); IndexMap indexMap = indexMapRef.indexMapSnapshot();


final Map<Long,RebuildingIndexDescriptor> rebuildingDescriptors = new HashMap<>(); final Map<Long,RebuildingIndexDescriptor> rebuildingDescriptors = new HashMap<>();
Expand Down Expand Up @@ -419,9 +391,9 @@ public void apply( IndexUpdates updates ) throws IOException, IndexEntryConflict
{ {
if ( state == State.NOT_STARTED ) if ( state == State.NOT_STARTED )
{ {
// We're in recovery, which means we'll merely be noting which entity ids are to be refreshed // We're in recovery, which means we'll be telling indexes to apply with additional care for making
// and we'll refresh them completely after recovery completes. // idempotent changes.
updates.collectUpdatedNodeIds( recoveredNodeIds ); apply( updates, IndexUpdateMode.RECOVERY );
} }
else if ( state == State.RUNNING || state == State.STARTING ) else if ( state == State.RUNNING || state == State.STARTING )
{ {
Expand Down Expand Up @@ -518,37 +490,6 @@ public void createIndexes( IndexRule... rules ) throws IOException
indexMapRef.setIndexMap( indexMap ); indexMapRef.setIndexMap( indexMap );
} }


private void applyRecoveredUpdates() throws IOException, IndexEntryConflictException
{
if ( log.isDebugEnabled() )
{
log.debug( "Applying recovered updates: " + recoveredNodeIds );
}
monitor.applyingRecoveredData( recoveredNodeIds );
if ( !recoveredNodeIds.isEmpty() )
{
try ( IndexUpdaterMap updaterMap = indexMapRef.createIndexUpdaterMap( IndexUpdateMode.RECOVERY ) )
{
for ( IndexUpdater updater : updaterMap )
{
updater.remove( recoveredNodeIds );
}

Iterator<NodeUpdates> updates = recoveredNodeUpdatesIterator();
Iterator<IndexEntryUpdate<LabelSchemaDescriptor>> indexEntryUpdates =
Iterators.flatMap( nodeUpdates -> convertToIndexUpdates( nodeUpdates ).iterator(), updates );
while ( indexEntryUpdates.hasNext() )
{
IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate = indexEntryUpdates.next();
monitor.applyingRecoveredUpdate( indexUpdate );
processUpdate( updaterMap, indexUpdate );
}
monitor.recoveredUpdatesApplied();
}
}
recoveredNodeIds.clear();
}

private void processUpdate( IndexUpdaterMap updaterMap, IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate ) private void processUpdate( IndexUpdaterMap updaterMap, IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate )
throws IOException, IndexEntryConflictException throws IOException, IndexEntryConflictException
{ {
Expand All @@ -559,12 +500,6 @@ private void processUpdate( IndexUpdaterMap updaterMap, IndexEntryUpdate<LabelSc
} }
} }


private Iterator<NodeUpdates> recoveredNodeUpdatesIterator()
{
PrimitiveLongIterator nodeIdIterator = recoveredNodeIds.iterator();
return new NodeUpdatesIterator( storeView, nodeIdIterator );
}

public void dropIndex( IndexRule rule ) public void dropIndex( IndexRule rule )
{ {
long indexId = rule.getId(); long indexId = rule.getId();
Expand Down

0 comments on commit 1208e3d

Please sign in to comment.