diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TxPullRequestResult.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TxPullRequestResult.java
new file mode 100644
index 0000000000000..adb2c8db0d226
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/TxPullRequestResult.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.catchup;
+
+public class TxPullRequestResult
+{
+ private final CatchupResult catchupResult;
+ private final long lastTxId;
+
+ public TxPullRequestResult( CatchupResult catchupResult, long lastTxId )
+ {
+ this.catchupResult = catchupResult;
+ this.lastTxId = lastTxId;
+ }
+
+ public CatchupResult catchupResult()
+ {
+ return catchupResult;
+ }
+
+ public long lastTxId()
+ {
+ return lastTxId;
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java
index edbcb699cee9a..4aa3974f79e82 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java
@@ -24,6 +24,7 @@
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchupResult;
+import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
@@ -35,6 +36,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
+import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_BATCH;
import static org.neo4j.causalclustering.catchup.CatchupResult.SUCCESS_END_OF_STREAM;
public class StoreFetcher
@@ -68,19 +70,6 @@ public CatchupResult tryCatchingUp( MemberId from, StoreId expectedStoreId, File
return pullTransactions( from, expectedStoreId, storeDir, lastCommittedTxId );
}
- private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId ) throws IOException, StoreCopyFailedException
- {
- try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
- {
- log.info( "Pulling transactions from: %d", fromTxId );
- return txPullClient.pullTransactions( from, expectedStoreId, fromTxId, writer );
- }
- catch ( CatchUpClientException e )
- {
- throw new StoreCopyFailedException( e );
- }
- }
-
public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
throws StoreCopyFailedException, StreamingTransactionsFailedException
{
@@ -108,6 +97,31 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
}
}
+ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId ) throws IOException, StoreCopyFailedException
+ {
+ try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
+ {
+ log.info( "Pulling transactions from: %d", fromTxId );
+
+ long pullRequestTxId = fromTxId;
+
+ CatchupResult lastStatus;
+ do
+ {
+ TxPullRequestResult result = txPullClient.pullTransactions( from, expectedStoreId, pullRequestTxId, writer );
+ lastStatus = result.catchupResult();
+ pullRequestTxId = result.lastTxId();
+ }
+ while ( lastStatus == SUCCESS_END_OF_BATCH );
+
+ return lastStatus;
+ }
+ catch ( CatchUpClientException e )
+ {
+ throw new StoreCopyFailedException( e );
+ }
+ }
+
public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedException
{
return storeCopyClient.fetchStoreId( from );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java
index c055daca83be1..a2904a432d634 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullClient.java
@@ -24,7 +24,7 @@
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
-import org.neo4j.causalclustering.catchup.CatchupResult;
+import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.kernel.monitoring.Monitors;
@@ -40,24 +40,29 @@ public TxPullClient( CatchUpClient catchUpClient, Monitors monitors )
this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class );
}
- public CatchupResult pullTransactions( MemberId from, StoreId storeId, long startTxId,
- TxPullResponseListener txPullResponseListener ) throws CatchUpClientException
+ public TxPullRequestResult pullTransactions( MemberId from, StoreId storeId, long startTxId,
+ TxPullResponseListener txPullResponseListener )
+ throws CatchUpClientException
{
pullRequestMonitor.txPullRequest( startTxId );
return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ),
- new CatchUpResponseAdaptor()
+ new CatchUpResponseAdaptor()
{
+ private long lastTxIdReceived = startTxId;
+
@Override
- public void onTxPullResponse( CompletableFuture signal, TxPullResponse response )
+ public void onTxPullResponse( CompletableFuture signal,
+ TxPullResponse response )
{
+ this.lastTxIdReceived = response.tx().getCommitEntry().getTxId();
txPullResponseListener.onTxReceived( response );
}
@Override
- public void onTxStreamFinishedResponse( CompletableFuture signal,
+ public void onTxStreamFinishedResponse( CompletableFuture signal,
TxStreamFinishedResponse response )
{
- signal.complete( response.status() );
+ signal.complete( new TxPullRequestResult(response.status(), lastTxIdReceived ));
}
} );
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
index ae500ec3cc435..d5f2840a5fe32 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java
@@ -185,8 +185,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
BatchingTxApplier batchingTxApplier = new BatchingTxApplier( maxBatchSize,
dependencies.provideDependency( TransactionIdStore.class ),
writableCommitProcess, databaseHealthSupplier, platformModule.monitors, logProvider );
-// ContinuousJob txApplyJob = new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group(
-// "tx-applier", NEW_THREAD ), batchingTxApplier, logProvider );
DelayedRenewableTimeoutService txPullerTimeoutService =
new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider );
@@ -215,7 +213,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
dependencies.satisfyDependencies( txPuller );
txPulling.add( batchingTxApplier );
-// txPulling.add( txApplyJob );
txPulling.add( txPuller );
txPulling.add( txPullerTimeoutService );
diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java
index bb18fb3a5d798..bb2601d2d27c1 100644
--- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java
+++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcherTest.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.UUID;
+import org.neo4j.causalclustering.catchup.TxPullRequestResult;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpWriter;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
@@ -54,7 +55,7 @@ public void shouldCopyStoreFilesAndPullTransactions() throws Exception
StoreId storeId = new StoreId( 1, 2, 3, 4 );
StoreCopyClient storeCopyClient = mock( StoreCopyClient.class );
TxPullClient txPullClient = mock( TxPullClient.class );
- when( txPullClient.pullTransactions( any(), any(), anyLong(), any() ) ).thenReturn( SUCCESS_END_OF_STREAM );
+ when( txPullClient.pullTransactions( any(), any(), anyLong(), any() ) ).thenReturn( new TxPullRequestResult( SUCCESS_END_OF_STREAM, 13) );
TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );
StoreFetcher fetcher = new StoreFetcher( NullLogProvider.getInstance(), mock( FileSystemAbstraction.class ),
@@ -83,7 +84,7 @@ public void shouldSetLastPulledTransactionId() throws Exception
TxPullClient txPullClient = mock( TxPullClient.class );
when( txPullClient.pullTransactions( eq( localhost ), eq( wantedStoreId ), anyLong(), any( TxPullResponseListener.class ) ) )
- .thenReturn( SUCCESS_END_OF_STREAM );
+ .thenReturn( new TxPullRequestResult( SUCCESS_END_OF_STREAM, 13) );
TransactionLogCatchUpWriter writer = mock( TransactionLogCatchUpWriter.class );