Skip to content

Commit

Permalink
Start a new TX pull request from the lastCommittedTxId rather than
Browse files Browse the repository at this point in the history
the last queued id.

Also adding more logging so we can see what batch we're on and the
address of the core server if a PR fails
  • Loading branch information
Mark Needham committed Nov 15, 2016
1 parent 86273a1 commit e411770
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
Expand Up @@ -92,7 +92,8 @@ public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
channel.setResponseHandler( responseHandler, future );
channel.send( request );

String operation = String.format( "Timed out executing operation %s on %s", request, target );
String operation = String.format( "Timed out executing operation %s on %s (%s)",
request, target, catchUpAddress.get() );

return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis );
}
Expand Down
Expand Up @@ -84,10 +84,21 @@ public void stop()
void refreshFromNewStore()
{
assert txQueue == null || txQueue.isEmpty();
lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId();
resetLastQueuedTxId();
commitProcess = commitProcessSupplier.get();
}

public void emptyQueueAndResetLastQueuedTxId()
{
applyBatch();
resetLastQueuedTxId();
}

private void resetLastQueuedTxId()
{
lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId();
}

/**
* Queues a transaction for application.
*
Expand Down
Expand Up @@ -105,16 +105,19 @@ public synchronized void start() throws Throwable
private synchronized void onTimeout()
{
timeout.renew();
applier.emptyQueueAndResetLastQueuedTxId();

try
{
MemberId core = connectionStrategy.coreMember();
StoreId localStoreId = localDatabase.storeId();

boolean moreToPull = true;
int batchCount = 1;
while ( moreToPull )
{
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId );
moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount );
batchCount++;
}
}
catch ( Throwable e )
Expand All @@ -123,12 +126,12 @@ private synchronized void onTimeout()
}
}

private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId ) throws Throwable
private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable
{
long lastQueuedTxId = applier.lastQueuedTxId();
pullRequestMonitor.txPullRequest( lastQueuedTxId );
TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId );
log.info( "Pull transactions where tx id > %d", lastQueuedTxId );
log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount );
CatchupResult catchupResult =
catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor<CatchupResult>()
{
Expand Down

0 comments on commit e411770

Please sign in to comment.