From 0e2dcecc84160430efbb54f872b61e47b8b8847f Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Tue, 22 Nov 2016 16:40:23 +0100 Subject: [PATCH] Clarify interface and variables around tx pulling. --- .../catchup/storecopy/StoreFetcher.java | 16 ++++++---------- .../catchup/tx/TxPullClient.java | 8 ++++---- .../catchup/storecopy/StoreFetcherTest.java | 3 ++- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java index 9f4ec7b09d16e..9cc5848139278 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java @@ -67,7 +67,7 @@ public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File { ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir ); long lastCommittedTxId = transactionIdStore.getLastCommittedTransactionId(); - return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId ); + return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId + 1 ); } public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) @@ -78,13 +78,9 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) log.info( "Copying store from %s", from ); long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) ); - // We require at least one transaction for extracting the log index of the consensus log. - // Given there might not have been any activity on the source server we need to ask for the - // log entry for the lastFlushedTxId even though we've already applied its contents - long pullTxIndex = lastFlushedTxId - 1; - log.info( "Store files need to be recovered starting from: %d", pullTxIndex ); + log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId ); - CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, pullTxIndex ); + CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, lastFlushedTxId ); if ( catchupResult != SUCCESS_END_OF_STREAM ) { throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult ); @@ -102,14 +98,14 @@ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, { log.info( "Pulling transactions from: %d", fromTxId ); - long pullRequestTxId = fromTxId; + long previousTxId = fromTxId - 1; CatchupResult lastStatus; do { - TxPullRequestResult result = txPullClient.pullTransactions( from, expectedStoreId, pullRequestTxId, writer ); + TxPullRequestResult result = txPullClient.pullTransactions( from, expectedStoreId, previousTxId, writer ); lastStatus = result.catchupResult(); - pullRequestTxId = result.lastTxId(); + previousTxId = result.lastTxId(); } while ( lastStatus == SUCCESS_END_OF_BATCH ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java index a2904a432d634..bdc534603880a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java @@ -40,15 +40,15 @@ public TxPullClient( CatchUpClient catchUpClient, Monitors monitors ) this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); } - public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long startTxId, + public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long previousTxId, TxPullResponseListener txPullResponseListener ) throws CatchUpClientException { - pullRequestMonitor.txPullRequest( startTxId ); - return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ), + pullRequestMonitor.txPullRequest( previousTxId ); + return catchUpClient.makeBlockingRequest( from, new TxPullRequest( previousTxId, storeId ), new CatchUpResponseAdaptor() { - private long lastTxIdReceived = startTxId; + private long lastTxIdReceived = previousTxId; @Override public void onTxPullResponse( CompletableFuture signal, diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java index bb2601d2d27c1..835bbd73982a0 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java @@ -95,7 +95,8 @@ public void shouldSetLastPulledTransactionId() throws Exception fetcher.copyStore( localhost, wantedStoreId, new File( "destination" ) ); // then - verify( txPullClient ).pullTransactions( eq( localhost ), eq( wantedStoreId ), eq( lastFlushedTxId - 1 ), any( TxPullResponseListener.class ) ); + long previousTxId = lastFlushedTxId - 1; // the interface is defined as asking for the one preceding + verify( txPullClient ).pullTransactions( eq( localhost ), eq( wantedStoreId ), eq( previousTxId ), any( TxPullResponseListener.class ) ); } @Test