Skip to content

Commit

Permalink
Read replica catchup remake
Browse files Browse the repository at this point in the history
Introduce two states for transaction pulling and store copying respectively
to make the catchup process cleaner and improve on the error handling.
  • Loading branch information
martinfurmanski authored and Mark Needham committed Nov 18, 2016
1 parent 51bac8f commit 43b7315
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 299 deletions.
Expand Up @@ -76,8 +76,7 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
try try
{ {
log.info( "Copying store from %s", from ); log.info( "Copying store from %s", from );
long lastFlushedTxId = long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) );
storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) );


// We require at least one transaction for extracting the log index of the consensus log. // We require at least one transaction for extracting the log index of the consensus log.
// Given there might not have been any activity on the source server we need to ask for the // Given there might not have been any activity on the source server we need to ask for the
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand All @@ -43,7 +42,6 @@ public class BatchingTxApplier extends LifecycleAdapter
private final int maxBatchSize; private final int maxBatchSize;
private final Supplier<TransactionIdStore> txIdStoreSupplier; private final Supplier<TransactionIdStore> txIdStoreSupplier;
private final Supplier<TransactionCommitProcess> commitProcessSupplier; private final Supplier<TransactionCommitProcess> commitProcessSupplier;
private final Supplier<DatabaseHealth> healthSupplier;


private final PullRequestMonitor monitor; private final PullRequestMonitor monitor;
private final Log log; private final Log log;
Expand All @@ -55,13 +53,12 @@ public class BatchingTxApplier extends LifecycleAdapter
private volatile boolean stopped; private volatile boolean stopped;


public BatchingTxApplier( int maxBatchSize, Supplier<TransactionIdStore> txIdStoreSupplier, public BatchingTxApplier( int maxBatchSize, Supplier<TransactionIdStore> txIdStoreSupplier,
Supplier<TransactionCommitProcess> commitProcessSupplier, Supplier<DatabaseHealth> healthSupplier, Supplier<TransactionCommitProcess> commitProcessSupplier,
Monitors monitors, LogProvider logProvider ) Monitors monitors, LogProvider logProvider )
{ {
this.maxBatchSize = maxBatchSize; this.maxBatchSize = maxBatchSize;
this.txIdStoreSupplier = txIdStoreSupplier; this.txIdStoreSupplier = txIdStoreSupplier;
this.commitProcessSupplier = commitProcessSupplier; this.commitProcessSupplier = commitProcessSupplier;
this.healthSupplier = healthSupplier;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.monitor = monitors.newMonitor( PullRequestMonitor.class ); this.monitor = monitors.newMonitor( PullRequestMonitor.class );
} }
Expand All @@ -71,8 +68,7 @@ public void start()
{ {
stopped = false; stopped = false;
refreshFromNewStore(); refreshFromNewStore();
txQueue = txQueue = new TransactionQueue( maxBatchSize, ( first, last ) -> commitProcess.commit( first, NULL, EXTERNAL ) );
new TransactionQueue( maxBatchSize, ( first, last ) -> commitProcess.commit( first, NULL, EXTERNAL ) );
} }


@Override @Override
Expand All @@ -84,27 +80,16 @@ public void stop()
void refreshFromNewStore() void refreshFromNewStore()
{ {
assert txQueue == null || txQueue.isEmpty(); assert txQueue == null || txQueue.isEmpty();
resetLastQueuedTxId();
commitProcess = commitProcessSupplier.get();
}

public void emptyQueueAndResetLastQueuedTxId()
{
applyBatch();
resetLastQueuedTxId();
}

private void resetLastQueuedTxId()
{
lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId(); lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId();
commitProcess = commitProcessSupplier.get();
} }


/** /**
* Queues a transaction for application. * Queues a transaction for application.
* *
* @param tx The transaction to be queued for application. * @param tx The transaction to be queued for application.
*/ */
public void queue( CommittedTransactionRepresentation tx ) public void queue( CommittedTransactionRepresentation tx ) throws Exception
{ {
long receivedTxId = tx.getCommitEntry().getTxId(); long receivedTxId = tx.getCommitEntry().getTxId();
long expectedTxId = lastQueuedTxId + 1; long expectedTxId = lastQueuedTxId + 1;
Expand All @@ -115,15 +100,7 @@ public void queue( CommittedTransactionRepresentation tx )
return; return;
} }


try txQueue.queue( new TransactionToApply( tx.getTransactionRepresentation(), receivedTxId ) );
{
txQueue.queue( new TransactionToApply( tx.getTransactionRepresentation(), receivedTxId ) );
}
catch ( Exception e )
{
log.error( "Error while queueing transaction", e );
healthSupplier.get().panic( e );
}


if ( !stopped ) if ( !stopped )
{ {
Expand All @@ -132,17 +109,9 @@ public void queue( CommittedTransactionRepresentation tx )
} }
} }


void applyBatch() void applyBatch() throws Exception
{ {
try txQueue.empty();
{
txQueue.empty();
}
catch ( Exception e )
{
log.error( "Error during transaction application", e );
healthSupplier.get().panic( e );
}
} }


/** /**
Expand Down

0 comments on commit 43b7315

Please sign in to comment.