Skip to content

Commit

Permalink
Reworked transaction batching during recovery
Browse files Browse the repository at this point in the history
Applies all transactions immediately instead of building up 100 at a time and apply
as batch. The upside of doing that was to do index refreshing less often,
one refresh per batch.

By avoiding refresh completely during recovery and instead do one refresh after recovery
completes avoids the need to do this batching (which can require a lot of heap for
large transactions) and also avoids refreshing entirely. This change results in
even faster recovery than before while at the same time using less memory.
  • Loading branch information
tinwelint committed Dec 15, 2017
1 parent 334b1dd commit 5f22906
Show file tree
Hide file tree
Showing 18 changed files with 183 additions and 38 deletions.
Expand Up @@ -66,6 +66,16 @@ public interface IndexAccessor extends Closeable
*/ */
void force() throws IOException; void force() throws IOException;


/**
* Refreshes this index, so that {@link #newReader() readers} created after completion of this call
* will see the latest updates. This happens automatically on closing {@link #newUpdater(IndexUpdateMode)}
* w/ {@link IndexUpdateMode#ONLINE}, but not guaranteed for {@link IndexUpdateMode#RECOVERY}.
* Therefore this call is complementary for updates that has taken place with {@link IndexUpdateMode#RECOVERY}.
*
* @throws IOException if there was a problem refreshing the index.
*/
void refresh() throws IOException;

/** /**
* Closes this index accessor. There will not be any interactions after this call. * Closes this index accessor. There will not be any interactions after this call.
* After completion of this call there cannot be any essential state that hasn't been forced to disk. * After completion of this call there cannot be any essential state that hasn't been forced to disk.
Expand Down Expand Up @@ -119,6 +129,11 @@ public void force()
{ {
} }


@Override
public void refresh()
{
}

@Override @Override
public void close() public void close()
{ {
Expand Down Expand Up @@ -194,6 +209,12 @@ public void force() throws IOException
delegate.force(); delegate.force();
} }


@Override
public void refresh() throws IOException
{
delegate.refresh();
}

@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Expand Down
Expand Up @@ -90,6 +90,12 @@ public void force() throws IOException
getDelegate().force(); getDelegate().force();
} }


@Override
public void refresh() throws IOException
{
getDelegate().refresh();
}

@Override @Override
public Future<Void> close() throws IOException public Future<Void> close() throws IOException
{ {
Expand Down
Expand Up @@ -75,6 +75,11 @@ public void force()
{ {
} }


@Override
public void refresh()
{
}

@Override @Override
public IndexDescriptor getDescriptor() public IndexDescriptor getDescriptor()
{ {
Expand Down
Expand Up @@ -140,6 +140,20 @@ public void force() throws IOException
} }
} }


@Override
public void refresh() throws IOException
{
lock.readLock();
try
{
delegate.refresh();
}
finally
{
lock.readLock().unlock();
}
}

/** /**
* Acquire the {@code ReadLock} in an <i>unfair</i> way, without waiting for queued up writers. * Acquire the {@code ReadLock} in an <i>unfair</i> way, without waiting for queued up writers.
* <p/> * <p/>
Expand Down
Expand Up @@ -80,6 +80,7 @@ public interface IndexProxy extends LabelSchemaSupplier


IndexDescriptor getDescriptor(); IndexDescriptor getDescriptor();


@Override
LabelSchemaDescriptor schema(); LabelSchemaDescriptor schema();


SchemaIndexProvider.Descriptor getProviderDescriptor(); SchemaIndexProvider.Descriptor getProviderDescriptor();
Expand All @@ -95,6 +96,8 @@ public interface IndexProxy extends LabelSchemaSupplier


void force() throws IOException; void force() throws IOException;


void refresh() throws IOException;

/** /**
* @throws IndexNotFoundKernelException if the index isn't online yet. * @throws IndexNotFoundKernelException if the index isn't online yet.
*/ */
Expand Down
Expand Up @@ -24,10 +24,34 @@ public enum IndexUpdateMode
/** /**
* Used when the db is online * Used when the db is online
*/ */
ONLINE, ONLINE( false, true ),

/**
* Used when flipping from populating to online
*/
ONLINE_IDEMPOTENT( true, true ),


/** /**
* Used when the db is recoverying * Used when the db is recoverying
*/ */
RECOVERY RECOVERY( true, false );

private final boolean idempotency;
private final boolean refresh;

IndexUpdateMode( boolean idempotency, boolean refresh )
{
this.idempotency = idempotency;
this.refresh = refresh;
}

public boolean requiresIdempotency()
{
return idempotency;
}

public boolean requiresRefresh()
{
return refresh;
}
} }
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;


import org.neo4j.function.ThrowingConsumer;
import org.neo4j.function.ThrowingFunction; import org.neo4j.function.ThrowingFunction;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
Expand Down Expand Up @@ -227,6 +228,10 @@ public void start() throws Exception
{ {
state = State.STARTING; state = State.STARTING;


// Recovery will not do refresh (update read views) while applying recovered transactions and instead
// do it at one point after recovery... i.e. here
indexMapRef.indexMapSnapshot().forEachIndexProxy( indexProxyOperation( "refresh", proxy -> proxy.refresh() ) );

final Map<Long,RebuildingIndexDescriptor> rebuildingDescriptors = new HashMap<>(); final Map<Long,RebuildingIndexDescriptor> rebuildingDescriptors = new HashMap<>();
indexMapRef.modify( indexMap -> indexMapRef.modify( indexMap ->
{ {
Expand Down Expand Up @@ -565,29 +570,28 @@ public void validateIndex( long indexId )


public void forceAll() public void forceAll()
{ {
indexMapRef.indexMapSnapshot().forEachIndexProxy( forceIndexProxy() ); indexMapRef.indexMapSnapshot().forEachIndexProxy( indexProxyOperation( "force", proxy -> proxy.force() ) );
} }


private BiConsumer<Long,IndexProxy> forceIndexProxy() private BiConsumer<Long,IndexProxy> indexProxyOperation( String name, ThrowingConsumer<IndexProxy,Exception> operation )
{ {
return ( id, indexProxy ) -> return ( id, indexProxy ) ->
{ {
try try
{ {
indexProxy.force(); operation.accept( indexProxy );
} }
catch ( Exception e ) catch ( Exception e )
{ {
try try
{ {
IndexProxy proxy = indexMapRef.getIndexProxy( id ); IndexProxy proxy = indexMapRef.getIndexProxy( id );
throw new UnderlyingStorageException( "Unable to force " + proxy, e ); throw new UnderlyingStorageException( "Unable to " + name + " " + proxy, e );
} }
catch ( IndexNotFoundKernelException infe ) catch ( IndexNotFoundKernelException infe )
{ {
// index was dropped while we where try to flush it, we can continue to flush other indexes // index was dropped while trying to operate on it, we can continue to other indexes
} }

} }
}; };
} }
Expand Down
Expand Up @@ -79,6 +79,7 @@ public OnlineIndexProxy( long indexId, IndexDescriptor descriptor,
IndexAccessor accessor, IndexStoreView storeView, SchemaIndexProvider.Descriptor providerDescriptor, IndexAccessor accessor, IndexStoreView storeView, SchemaIndexProvider.Descriptor providerDescriptor,
boolean forcedIdempotentMode ) boolean forcedIdempotentMode )
{ {
assert accessor != null;
this.indexId = indexId; this.indexId = indexId;
this.descriptor = descriptor; this.descriptor = descriptor;
this.storeView = storeView; this.storeView = storeView;
Expand All @@ -97,7 +98,10 @@ public void start()
@Override @Override
public IndexUpdater newUpdater( final IndexUpdateMode mode ) public IndexUpdater newUpdater( final IndexUpdateMode mode )
{ {
IndexUpdater actual = accessor.newUpdater( forcedIdempotentMode ? IndexUpdateMode.RECOVERY : mode ); // If this proxy is flagged with taking extra care about idempotency then escalate ONLINE to ONLINE_IDEMPOTENT.
// At the time of writing this there's no other mode that needs to be escalated.
IndexUpdater actual = accessor.newUpdater(
mode == IndexUpdateMode.ONLINE && forcedIdempotentMode ? IndexUpdateMode.ONLINE_IDEMPOTENT : mode );
return started ? updateCountingUpdater( actual ) : actual; return started ? updateCountingUpdater( actual ) : actual;
} }


Expand Down Expand Up @@ -144,6 +148,12 @@ public void force() throws IOException
accessor.force(); accessor.force();
} }


@Override
public void refresh() throws IOException
{
accessor.refresh();
}

@Override @Override
public Future<Void> close() throws IOException public Future<Void> close() throws IOException
{ {
Expand Down
Expand Up @@ -119,7 +119,13 @@ public InternalIndexState getState()
@Override @Override
public void force() public void force()
{ {
// Ignored... this isn't controlled from the outside while we're populating the index. // Ignored... this isn't called from the outside while we're populating the index.
}

@Override
public void refresh()
{
// Ignored... this isn't called from the outside while we're populating the index.
} }


@Override @Override
Expand Down
Expand Up @@ -95,6 +95,12 @@ public void force() throws IOException
tree.checkpoint( IOLimiter.unlimited() ); tree.checkpoint( IOLimiter.unlimited() );
} }


@Override
public void refresh()
{
// not required in this implementation
}

@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Expand Down
Expand Up @@ -83,6 +83,13 @@ public void force() throws IOException
luceneAccessor.force(); luceneAccessor.force();
} }


@Override
public void refresh() throws IOException
{
nativeAccessor.refresh();
luceneAccessor.refresh();
}

@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException; import java.io.IOException;


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
Expand Down Expand Up @@ -83,7 +82,7 @@ public void startRecovery()
@Override @Override
public RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception public RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception
{ {
return new RecoveryVisitor( new TransactionQueue( 100, ( first, last ) -> storageEngine.apply( first, mode ) ) ); return new RecoveryVisitor( storageEngine, mode );
} }


@Override @Override
Expand Down Expand Up @@ -118,11 +117,13 @@ public void allTransactionsRecovered( CommittedTransactionRepresentation lastRec


static class RecoveryVisitor implements RecoveryApplier static class RecoveryVisitor implements RecoveryApplier
{ {
private final TransactionQueue transactionsToApply; private final StorageEngine storageEngine;
private final TransactionApplicationMode mode;


RecoveryVisitor( TransactionQueue transactionsToApply ) RecoveryVisitor( StorageEngine storageEngine, TransactionApplicationMode mode )
{ {
this.transactionsToApply = transactionsToApply; this.storageEngine = storageEngine;
this.mode = mode;
} }


@Override @Override
Expand All @@ -133,14 +134,13 @@ public boolean visit( CommittedTransactionRepresentation transaction ) throws Ex
TransactionToApply tx = new TransactionToApply( txRepresentation, txId ); TransactionToApply tx = new TransactionToApply( txRepresentation, txId );
tx.commitment( NO_COMMITMENT, txId ); tx.commitment( NO_COMMITMENT, txId );
tx.logPosition( transaction.getStartEntry().getStartPosition() ); tx.logPosition( transaction.getStartEntry().getStartPosition() );
transactionsToApply.queue( tx ); storageEngine.apply( tx, mode );
return false; return false;
} }


@Override @Override
public void close() throws Exception public void close() throws Exception
{ { // nothing to close
transactionsToApply.empty();
} }
} }
} }
Expand Up @@ -140,5 +140,4 @@ void createCommands(


@Deprecated @Deprecated
void clearBufferedIds(); void clearBufferedIds();

} }
Expand Up @@ -65,6 +65,11 @@ public void force()
{ {
} }


@Override
public void refresh()
{
}

@Override @Override
public Future<Void> close() public Future<Void> close()
{ {
Expand Down
Expand Up @@ -193,7 +193,7 @@ public void shouldBringIndexOnlineAndFlipOverToIndexAccessor() throws Exception
InOrder order = inOrder( populator, accessor, updater); InOrder order = inOrder( populator, accessor, updater);
order.verify( populator ).create(); order.verify( populator ).create();
order.verify( populator ).close( true ); order.verify( populator ).close( true );
order.verify( accessor ).newUpdater( IndexUpdateMode.RECOVERY ); order.verify( accessor ).newUpdater( IndexUpdateMode.ONLINE_IDEMPOTENT );
order.verify( updater ).process( add( 10, "foo" ) ); order.verify( updater ).process( add( 10, "foo" ) );
order.verify( updater ).close(); order.verify( updater ).close();
} }
Expand Down Expand Up @@ -400,6 +400,8 @@ public void shouldLogIndexStateOnStart() throws Exception
.thenReturn( InternalIndexState.POPULATING ); .thenReturn( InternalIndexState.POPULATING );
when( provider.getInitialState( failedIndex.getId(), failedIndex.getIndexDescriptor() ) ) when( provider.getInitialState( failedIndex.getId(), failedIndex.getIndexDescriptor() ) )
.thenReturn( InternalIndexState.FAILED ); .thenReturn( InternalIndexState.FAILED );
when( provider.getOnlineAccessor( anyLong(), any( IndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenAnswer(
invocation -> mock( IndexAccessor.class ) );


indexingService.init(); indexingService.init();


Expand Down Expand Up @@ -1049,6 +1051,29 @@ public void failForceAllWhenOneOfTheIndexesFailToForce() throws IOException
indexingService.forceAll(); indexingService.forceAll();
} }


@Test
public void shouldRefreshIndexesOnStart() throws Exception
{
// given
IndexRule rule = IndexRule.indexRule( 0, index, PROVIDER_DESCRIPTOR );
IndexingService indexing = newIndexingServiceWithMockedDependencies( populator, accessor, withData(), rule );

IndexAccessor accessor = mock( IndexAccessor.class );
IndexUpdater updater = mock( IndexUpdater.class );
when( accessor.newUpdater( any( IndexUpdateMode.class ) ) ).thenReturn( updater );
when( indexProvider.getOnlineAccessor( eq( rule.getId() ), any( IndexDescriptor.class ),
any( IndexSamplingConfig.class ) ) ).thenReturn( accessor );

life.init();

verify( accessor, times( 0 ) ).refresh();

life.start();

// Then
verify( accessor, times( 1 ) ).refresh();
}

private IndexProxy createIndexProxyMock() private IndexProxy createIndexProxyMock()
{ {
IndexProxy proxy = mock( IndexProxy.class ); IndexProxy proxy = mock( IndexProxy.class );
Expand Down
Expand Up @@ -188,6 +188,11 @@ public void force()
{ {
} }


@Override
public void refresh()
{
}

@Override @Override
public void drop() public void drop()
{ {
Expand Down

0 comments on commit 5f22906

Please sign in to comment.