From e41177089ed005a81712997cf9d2e6a94baec1e2 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Tue, 15 Nov 2016 13:36:55 +0000 Subject: [PATCH] Start a new TX pull request from the lastCommittedTxId rather than the last queued id. Also adding more logging so we can see what batch we're on and the address of the core server if a PR fails --- .../causalclustering/catchup/CatchUpClient.java | 3 ++- .../catchup/tx/BatchingTxApplier.java | 13 ++++++++++++- .../catchup/tx/TxPollingClient.java | 9 ++++++--- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index d584b4903e81..b39cbdab713d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -92,7 +92,8 @@ public T makeBlockingRequest( MemberId target, CatchUpRequest request, channel.setResponseHandler( responseHandler, future ); channel.send( request ); - String operation = String.format( "Timed out executing operation %s on %s", request, target ); + String operation = String.format( "Timed out executing operation %s on %s (%s)", + request, target, catchUpAddress.get() ); return waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeoutMillis ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java index fe4f084577b3..13642458ddd8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java @@ -84,10 +84,21 @@ public void stop() void refreshFromNewStore() { assert txQueue == null || txQueue.isEmpty(); - lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId(); + resetLastQueuedTxId(); commitProcess = commitProcessSupplier.get(); } + public void emptyQueueAndResetLastQueuedTxId() + { + applyBatch(); + resetLastQueuedTxId(); + } + + private void resetLastQueuedTxId() + { + lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId(); + } + /** * Queues a transaction for application. * diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java index 5adb2ce4fa7a..5561d5579186 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java @@ -105,6 +105,7 @@ public synchronized void start() throws Throwable private synchronized void onTimeout() { timeout.renew(); + applier.emptyQueueAndResetLastQueuedTxId(); try { @@ -112,9 +113,11 @@ private synchronized void onTimeout() StoreId localStoreId = localDatabase.storeId(); boolean moreToPull = true; + int batchCount = 1; while ( moreToPull ) { - moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId ); + moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); + batchCount++; } } catch ( Throwable e ) @@ -123,12 +126,12 @@ private synchronized void onTimeout() } } - private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId ) throws Throwable + private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable { long lastQueuedTxId = applier.lastQueuedTxId(); pullRequestMonitor.txPullRequest( lastQueuedTxId ); TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId ); - log.info( "Pull transactions where tx id > %d", lastQueuedTxId ); + log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount ); CatchupResult catchupResult = catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor() {