Skip to content

Commit

Permalink
Clarify interface and variables around tx pulling.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Nov 24, 2016
1 parent a18f649 commit 0e2dcec
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
Expand Up @@ -67,7 +67,7 @@ public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File
{ {
ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir ); ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir );
long lastCommittedTxId = transactionIdStore.getLastCommittedTransactionId(); long lastCommittedTxId = transactionIdStore.getLastCommittedTransactionId();
return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId ); return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId + 1 );
} }


public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
Expand All @@ -78,13 +78,9 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
log.info( "Copying store from %s", from ); log.info( "Copying store from %s", from );
long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) ); long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) );


// We require at least one transaction for extracting the log index of the consensus log. log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId );
// Given there might not have been any activity on the source server we need to ask for the
// log entry for the lastFlushedTxId even though we've already applied its contents
long pullTxIndex = lastFlushedTxId - 1;
log.info( "Store files need to be recovered starting from: %d", pullTxIndex );


CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, pullTxIndex ); CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, lastFlushedTxId );
if ( catchupResult != SUCCESS_END_OF_STREAM ) if ( catchupResult != SUCCESS_END_OF_STREAM )
{ {
throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult ); throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult );
Expand All @@ -102,14 +98,14 @@ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId,
{ {
log.info( "Pulling transactions from: %d", fromTxId ); log.info( "Pulling transactions from: %d", fromTxId );


long pullRequestTxId = fromTxId; long previousTxId = fromTxId - 1;


CatchupResult lastStatus; CatchupResult lastStatus;
do do
{ {
TxPullRequestResult result = txPullClient.pullTransactions( from, expectedStoreId, pullRequestTxId, writer ); TxPullRequestResult result = txPullClient.pullTransactions( from, expectedStoreId, previousTxId, writer );
lastStatus = result.catchupResult(); lastStatus = result.catchupResult();
pullRequestTxId = result.lastTxId(); previousTxId = result.lastTxId();
} }
while ( lastStatus == SUCCESS_END_OF_BATCH ); while ( lastStatus == SUCCESS_END_OF_BATCH );


Expand Down
Expand Up @@ -40,15 +40,15 @@ public TxPullClient( CatchUpClient catchUpClient, Monitors monitors )
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
} }


public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long startTxId, public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long previousTxId,
TxPullResponseListener txPullResponseListener ) TxPullResponseListener txPullResponseListener )
throws CatchUpClientException throws CatchUpClientException
{ {
pullRequestMonitor.txPullRequest( startTxId ); pullRequestMonitor.txPullRequest( previousTxId );
return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ), return catchUpClient.makeBlockingRequest( from, new TxPullRequest( previousTxId, storeId ),
new CatchUpResponseAdaptor<TxPullRequestResult>() new CatchUpResponseAdaptor<TxPullRequestResult>()
{ {
private long lastTxIdReceived = startTxId; private long lastTxIdReceived = previousTxId;


@Override @Override
public void onTxPullResponse( CompletableFuture<TxPullRequestResult> signal, public void onTxPullResponse( CompletableFuture<TxPullRequestResult> signal,
Expand Down
Expand Up @@ -95,7 +95,8 @@ public void shouldSetLastPulledTransactionId() throws Exception
fetcher.copyStore( localhost, wantedStoreId, new File( "destination" ) ); fetcher.copyStore( localhost, wantedStoreId, new File( "destination" ) );


// then // then
verify( txPullClient ).pullTransactions( eq( localhost ), eq( wantedStoreId ), eq( lastFlushedTxId - 1 ), any( TxPullResponseListener.class ) ); long previousTxId = lastFlushedTxId - 1; // the interface is defined as asking for the one preceding
verify( txPullClient ).pullTransactions( eq( localhost ), eq( wantedStoreId ), eq( previousTxId ), any( TxPullResponseListener.class ) );
} }


@Test @Test
Expand Down

0 comments on commit 0e2dcec

Please sign in to comment.