diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchUpClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchUpClient.java index ded5c0519f23e..85ad80d37641f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchUpClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/CatchUpClient.java @@ -46,6 +46,8 @@ public class CatchUpClient extends LifecycleAdapter { + private static final int DEFAULT_INACTIVITY_TIMEOUT = 5; // seconds + private final LogProvider logProvider; private final TopologyService discoveryService; private final Log log; @@ -64,14 +66,20 @@ public CatchUpClient( TopologyService discoveryService, LogProvider logProvider, this.monitors = monitors; } - public T makeBlockingRequest( MemberId memberId, CatchUpRequest request, + public T makeBlockingRequest( MemberId target, CatchUpRequest request, + CatchUpResponseCallback responseHandler ) throws CatchUpClientException, NoKnownAddressesException + { + return makeBlockingRequest( target, request, DEFAULT_INACTIVITY_TIMEOUT, TimeUnit.SECONDS, responseHandler ); + } + + public T makeBlockingRequest( MemberId target, CatchUpRequest request, long inactivityTimeout, TimeUnit timeUnit, CatchUpResponseCallback responseHandler ) throws CatchUpClientException, NoKnownAddressesException { CompletableFuture future = new CompletableFuture<>(); AdvertisedSocketAddress catchUpAddress = - discoveryService.coreServers().find( memberId ).getCatchupServer(); + discoveryService.coreServers().find( target ).getCatchupServer(); CatchUpChannel channel = pool.acquire( catchUpAddress ); future.whenComplete( ( result, e ) -> { @@ -88,7 +96,7 @@ public T makeBlockingRequest( MemberId memberId, CatchUpRequest request, channel.setResponseHandler( responseHandler, future ); channel.send( request ); - String operation = String.format( "Timed out executing operation %s on %s", request, memberId ); + String operation = String.format( "Timed out executing operation %s on %s", request, target ); return TimeoutLoop.waitForCompletion( future, operation, channel::millisSinceLastResponse, inactivityTimeout, timeUnit ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java index 8b55d6f801a7e..b0e937766d016 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/storecopy/StoreCopyClient.java @@ -45,7 +45,7 @@ long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams st { try { - return catchUpClient.makeBlockingRequest( from, new GetStoreRequest( expectedStoreId ), 30, SECONDS, + return catchUpClient.makeBlockingRequest( from, new GetStoreRequest( expectedStoreId ), new CatchUpResponseAdaptor() { private long expectedBytes = 0; @@ -89,7 +89,7 @@ StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException { try { - return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), 30, SECONDS, + return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), new CatchUpResponseAdaptor() { @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java index 9cc18586e14e1..8ad98a6ffc0c1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPollingClient.java @@ -106,7 +106,7 @@ private synchronized void onTimeout() long lastAppliedTxId = applier.lastAppliedTxId(); pullRequestMonitor.txPullRequest( lastAppliedTxId ); TxPullRequest txPullRequest = new TxPullRequest( lastAppliedTxId, localDatabase.get() ); - catchUpClient.makeBlockingRequest( transactionServer, txPullRequest, 30, TimeUnit.SECONDS, + catchUpClient.makeBlockingRequest( transactionServer, txPullRequest, new CatchUpResponseAdaptor() { @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java index 4e29ef07558b2..89d55f6140b02 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/catchup/tx/TxPullClient.java @@ -48,7 +48,7 @@ public CatchupResult pullTransactions( MemberId from, StoreId storeId, long star throws CatchUpClientException, NoKnownAddressesException { pullRequestMonitor.txPullRequest( startTxId ); - return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ), 30, SECONDS, + return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ), new CatchUpResponseAdaptor() { @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java index 78293d99f441b..e1b84b9e6dfb7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/snapshot/CoreStateDownloader.java @@ -88,7 +88,7 @@ public synchronized void downloadSnapshot( MemberId source, CoreState coreState * in the copied store. */ CoreSnapshot coreSnapshot = catchUpClient.makeBlockingRequest( source, new CoreSnapshotRequest(), - 1, MINUTES, new CatchUpResponseAdaptor() + new CatchUpResponseAdaptor() { @Override public void onCoreSnapshot( CompletableFuture signal, CoreSnapshot response ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java index 2d91598b70210..31c0ce12af4a6 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/catchup/tx/TxPollingClientTest.java @@ -58,12 +58,11 @@ public class TxPollingClientTest private final BatchingTxApplier txApplier = mock( BatchingTxApplier.class ); private final ControlledRenewableTimeoutService timeoutService = new ControlledRenewableTimeoutService(); - private final long txPullTimeoutMillis = 100; + private final long txPullIntervalMillis = 100; private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), () -> storeId, - catchUpClient, serverSelection, - timeoutService, txPullTimeoutMillis, txApplier, new Monitors() ); + catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors() ); @Before public void before() throws Throwable @@ -85,7 +84,7 @@ public void shouldSendPullRequestOnTick() throws Throwable // then verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - anyLong(), any( TimeUnit.class ), any( CatchUpResponseCallback.class ) ); + any( CatchUpResponseCallback.class ) ); } @Test @@ -99,7 +98,7 @@ public void shouldNotScheduleNewPullIfThereIsWorkPending() throws Exception // then verify( catchUpClient, never() ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - anyLong(), any( TimeUnit.class ), any( CatchUpResponseCallback.class ) ); + any( CatchUpResponseCallback.class ) ); } @Test @@ -111,7 +110,7 @@ public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable ArgumentCaptor captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class ); verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - anyLong(), any( TimeUnit.class ), captor.capture() ); + captor.capture() ); captor.getValue().onTxPullResponse( null, new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) );