diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java index 601859522202f..5561d5579186e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.catchup.tx; import java.util.concurrent.CompletableFuture; -import java.util.function.BooleanSupplier; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; @@ -29,23 +28,20 @@ import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy; import org.neo4j.causalclustering.readreplica.CopyStoreSafely; -import org.neo4j.function.Predicates; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; +import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT; /** * This class is responsible for pulling transactions from a core server and queuing @@ -70,33 +66,18 @@ enum Timeouts implements TimeoutName private final CopiedStoreRecovery copiedStoreRecovery; private final CatchUpClient catchUpClient; private final CoreMemberSelectionStrategy connectionStrategy; + private final RenewableTimeoutService timeoutService; private final long txPullIntervalMillis; private final BatchingTxApplier applier; private final PullRequestMonitor pullRequestMonitor; - private volatile JobScheduler.JobHandle handle; - private final JobScheduler scheduler; - private volatile boolean stopped; - private volatile boolean polling; - - public static final JobScheduler.Group txPolling = new JobScheduler.Group( "TxPolling", POOLED ); - - private final BooleanSupplier txPollingCondition = new BooleanSupplier() - { - @Override - public boolean getAsBoolean() - { - return !polling; - } - }; - + private RenewableTimeout timeout; public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase, - Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, - CoreMemberSelectionStrategy connectionStrategy, - long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, - CopiedStoreRecovery copiedStoreRecovery, - JobScheduler scheduler ) + Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, + CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService, + long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, + CopiedStoreRecovery copiedStoreRecovery ) { this.fs = fs; this.localDatabase = localDatabase; @@ -105,71 +86,45 @@ public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, Local this.storeFetcher = storeFetcher; this.catchUpClient = catchUpClient; this.connectionStrategy = connectionStrategy; + this.timeoutService = timeoutService; this.txPullIntervalMillis = txPullIntervalMillis; this.applier = applier; this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); this.copiedStoreRecovery = copiedStoreRecovery; - - this.scheduler = scheduler; } @Override public synchronized void start() throws Throwable { - handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS ); + timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() ); } - @Override - public void stop() throws Throwable + /** + * Time to pull! + */ + private synchronized void onTimeout() { - stopped = true; - if ( handle != null ) - { - handle.cancel( false ); - } - } + timeout.renew(); + applier.emptyQueueAndResetLastQueuedTxId(); - private final Runnable job = new Runnable() - { - @Override - public void run() + try { - try - { - polling = true; - if ( stopped ) - { - return; - } + MemberId core = connectionStrategy.coreMember(); + StoreId localStoreId = localDatabase.storeId(); - applier.emptyQueueAndResetLastQueuedTxId(); - MemberId core = connectionStrategy.coreMember(); - StoreId localStoreId = localDatabase.storeId(); - - boolean moreToPull = true; - int batchCount = 1; - while ( moreToPull ) - { - moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); - batchCount++; - } - } - catch ( Throwable e ) + boolean moreToPull = true; + int batchCount = 1; + while ( moreToPull ) { - log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e ); - } - finally - { - polling = false; - } - - // reschedule only if it is not stopped - if ( !stopped ) - { - handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS ); + moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); + batchCount++; } } - }; + catch ( Throwable e ) + { + log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e ); + } + } private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable { @@ -184,6 +139,9 @@ private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localSto public void onTxPullResponse( CompletableFuture signal, TxPullResponse response ) { applier.queue( response.tx() ); + // no applying here, just put it in the batch + + timeout.renew(); } @Override @@ -209,13 +167,16 @@ public void onTxStreamFinishedResponse( CompletableFuture signal, downloadDatabase( core, localStoreId ); return false; default: - log.info( "Tx pull unable to get transactions > %d ", lastQueuedTxId ); + log.info( "Tx pull unable to get transactions > %d " + lastQueuedTxId ); return false; } } private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Throwable { + pause(); + try + { localDatabase.stop(); startStopOnStoreCopy.stop(); new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ). @@ -223,5 +184,20 @@ private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Thro localDatabase.start(); startStopOnStoreCopy.start(); applier.refreshFromNewStore(); + } + finally + { + resume(); + } + } + + public void pause() + { + timeout.cancel(); + } + + public void resume() + { + timeout.renew(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 6c59bad6765c8..4f23be23ce9f9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -81,7 +81,6 @@ import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.state.DataSourceManager; -import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DefaultKernelData; import org.neo4j.kernel.lifecycle.LifeSupport; @@ -236,9 +235,9 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data TxPollingClient txPuller = new TxPollingClient( logProvider, fileSystem, localDatabase, servicesToStopOnStoreCopy, storeFetcher, - catchUpClient, new ConnectToRandomCoreMember( discoveryService ), + catchUpClient, new ConnectToRandomCoreMember( discoveryService ), txPullerTimeoutService, config.get( CausalClusteringSettings.pull_interval ), batchingTxApplier, - platformModule.monitors, copiedStoreRecovery, platformModule.jobScheduler ); + platformModule.monitors, copiedStoreRecovery ); dependencies.satisfyDependencies( txPuller ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java index 158488da7c982..727fb423b22e5 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java @@ -24,7 +24,6 @@ import org.mockito.ArgumentCaptor; import java.io.File; -import java.util.concurrent.TimeUnit; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpResponseCallback; @@ -39,25 +38,18 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; -import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; -import org.neo4j.test.OnDemandJobScheduler; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT; -import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.txPolling; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; -import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint; public class TxPollingClientTest { @@ -75,17 +67,15 @@ public class TxPollingClientTest private final CopiedStoreRecovery copiedStoreRecovery = mock( CopiedStoreRecovery.class ); private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); private final LocalDatabase localDatabase = mock( LocalDatabase.class ); - { when( localDatabase.storeId() ).thenReturn( storeId ); } private final Lifecycle startStopOnStoreCopy = mock( Lifecycle.class ); - private final OnDemandJobScheduler scheduler = spy( new OnDemandJobScheduler()); private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), fs, localDatabase, startStopOnStoreCopy, storeFetcher, - catchUpClient, serverSelection, txPullIntervalMillis, txApplier, new Monitors(), - copiedStoreRecovery, scheduler ); + catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors(), + copiedStoreRecovery ); @Before public void before() throws Throwable @@ -103,7 +93,7 @@ public void shouldSendPullRequestOnTick() throws Throwable when( txApplier.lastQueuedTxId() ).thenReturn( lastAppliedTxId ); // when - scheduler.runJob(); + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), @@ -122,7 +112,7 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable any( CatchUpResponseCallback.class ) )) .thenReturn( CatchupResult.SUCCESS_END_OF_BATCH, CatchupResult.SUCCESS_END_OF_STREAM ); - scheduler.runJob(); + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then verify( catchUpClient, times( 2 ) ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), @@ -130,19 +120,30 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable } @Test - public void shouldRescheduleTheJobAfterARun() throws Throwable + public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable { - // given - Runnable scheduledJob = scheduler.getJob(); - assertNotNull( scheduledJob ); + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + + StoreId storeId = new StoreId( 1, 2, 3, 4 ); + ArgumentCaptor captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class ); + + verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), + captor.capture() ); + captor.getValue().onTxPullResponse( null, + new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) ); + + verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), times( 2 ) ).renew(); + } + + @Test + public void shouldRenewTxPullTimeoutOnTick() throws Throwable + { // when - scheduler.runJob(); + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( scheduler, times( 2 ) ).schedule( eq( txPolling ), any( Runnable.class ), - eq( 100L ), eq( TimeUnit.MILLISECONDS ) ); - assertEquals( scheduledJob, scheduler.getJob() ); + verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew(); } @Test @@ -153,14 +154,17 @@ public void shouldCopyStoreIfCatchUpClientFails() throws Throwable any( CatchUpResponseCallback.class ) ) ).thenReturn( CatchupResult.E_TRANSACTION_PRUNED ); // when - scheduler.runJob();; + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then + verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).cancel(); verify( localDatabase ).stop(); verify( startStopOnStoreCopy ).stop(); verify( storeFetcher ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) ); verify( localDatabase ).start(); verify( startStopOnStoreCopy ).start(); verify( txApplier ).refreshFromNewStore(); + verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), + times( 2 ) /* at the beginning and after store copy */ ).renew(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java index 5f41ccecf8aea..e9b3d8b03e208 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java @@ -70,7 +70,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database(); TxPollingClient pollingClient = readReplicaGraphDatabase.getDependencyResolver() .resolveDependency( TxPollingClient.class ); -// pollingClient.pause(); + pollingClient.pause(); cluster.coreTx( ( coreGraphDatabase, transaction ) -> { coreGraphDatabase.createNode(); @@ -92,7 +92,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th } // when the poller is resumed, it does make it to the read replica -// pollingClient.resume(); + pollingClient.resume(); transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) ); }