From 95d6bb0ab7083e46c17955c12f7cb21417fd0f8b Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Tue, 15 Nov 2016 16:05:11 +0000 Subject: [PATCH] Simplify the way we do transaction polling. We don't need to have multiple threads competing to send a TX pull request. Instead lets just have one thread. --- .../catchup/tx/TxPollingClient.java | 126 +++++++++++------- .../EnterpriseReadReplicaEditionModule.java | 5 +- .../catchup/tx/TxPollingClientTest.java | 50 ++++--- .../scenarios/CausalConsistencyIT.java | 4 +- 4 files changed, 103 insertions(+), 82 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java index 5561d5579186e..601859522202f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.catchup.tx; import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; @@ -28,20 +29,23 @@ import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; -import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy; import org.neo4j.causalclustering.readreplica.CopyStoreSafely; +import org.neo4j.function.Predicates; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; /** * This class is responsible for pulling transactions from a core server and queuing @@ -66,18 +70,33 @@ enum Timeouts implements TimeoutName private final CopiedStoreRecovery copiedStoreRecovery; private final CatchUpClient catchUpClient; private final CoreMemberSelectionStrategy connectionStrategy; - private final RenewableTimeoutService timeoutService; private final long txPullIntervalMillis; private final BatchingTxApplier applier; private final PullRequestMonitor pullRequestMonitor; - private RenewableTimeout timeout; + private volatile JobScheduler.JobHandle handle; + private final JobScheduler scheduler; + private volatile boolean stopped; + private volatile boolean polling; + + public static final JobScheduler.Group txPolling = new JobScheduler.Group( "TxPolling", POOLED ); + + private final BooleanSupplier txPollingCondition = new BooleanSupplier() + { + @Override + public boolean getAsBoolean() + { + return !polling; + } + }; + public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase, - Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, - CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService, - long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, - CopiedStoreRecovery copiedStoreRecovery ) + Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, + CoreMemberSelectionStrategy connectionStrategy, + long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, + CopiedStoreRecovery copiedStoreRecovery, + JobScheduler scheduler ) { this.fs = fs; this.localDatabase = localDatabase; @@ -86,45 +105,71 @@ public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, Local this.storeFetcher = storeFetcher; this.catchUpClient = catchUpClient; this.connectionStrategy = connectionStrategy; - this.timeoutService = timeoutService; this.txPullIntervalMillis = txPullIntervalMillis; this.applier = applier; this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); this.copiedStoreRecovery = copiedStoreRecovery; + + this.scheduler = scheduler; } @Override public synchronized void start() throws Throwable { - timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() ); + handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS ); } - /** - * Time to pull! - */ - private synchronized void onTimeout() + @Override + public void stop() throws Throwable { - timeout.renew(); - applier.emptyQueueAndResetLastQueuedTxId(); + stopped = true; + if ( handle != null ) + { + handle.cancel( false ); + } + } - try + private final Runnable job = new Runnable() + { + @Override + public void run() { - MemberId core = connectionStrategy.coreMember(); - StoreId localStoreId = localDatabase.storeId(); + try + { + polling = true; + if ( stopped ) + { + return; + } - boolean moreToPull = true; - int batchCount = 1; - while ( moreToPull ) + applier.emptyQueueAndResetLastQueuedTxId(); + MemberId core = connectionStrategy.coreMember(); + StoreId localStoreId = localDatabase.storeId(); + + boolean moreToPull = true; + int batchCount = 1; + while ( moreToPull ) + { + moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); + batchCount++; + } + } + catch ( Throwable e ) { - moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); - batchCount++; + log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e ); + } + finally + { + polling = false; + } + + // reschedule only if it is not stopped + if ( !stopped ) + { + handle = scheduler.schedule( txPolling, job, txPullIntervalMillis, MILLISECONDS ); } } - catch ( Throwable e ) - { - log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e ); - } - } + }; private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable { @@ -139,9 +184,6 @@ private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localSto public void onTxPullResponse( CompletableFuture signal, TxPullResponse response ) { applier.queue( response.tx() ); - // no applying here, just put it in the batch - - timeout.renew(); } @Override @@ -167,16 +209,13 @@ public void onTxStreamFinishedResponse( CompletableFuture signal, downloadDatabase( core, localStoreId ); return false; default: - log.info( "Tx pull unable to get transactions > %d " + lastQueuedTxId ); + log.info( "Tx pull unable to get transactions > %d ", lastQueuedTxId ); return false; } } private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Throwable { - pause(); - try - { localDatabase.stop(); startStopOnStoreCopy.stop(); new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ). @@ -184,20 +223,5 @@ private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Thro localDatabase.start(); startStopOnStoreCopy.start(); applier.refreshFromNewStore(); - } - finally - { - resume(); - } - } - - public void pause() - { - timeout.cancel(); - } - - public void resume() - { - timeout.renew(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 4f23be23ce9f9..6c59bad6765c8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -81,6 +81,7 @@ import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.state.DataSourceManager; +import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DefaultKernelData; import org.neo4j.kernel.lifecycle.LifeSupport; @@ -235,9 +236,9 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data TxPollingClient txPuller = new TxPollingClient( logProvider, fileSystem, localDatabase, servicesToStopOnStoreCopy, storeFetcher, - catchUpClient, new ConnectToRandomCoreMember( discoveryService ), txPullerTimeoutService, + catchUpClient, new ConnectToRandomCoreMember( discoveryService ), config.get( CausalClusteringSettings.pull_interval ), batchingTxApplier, - platformModule.monitors, copiedStoreRecovery ); + platformModule.monitors, copiedStoreRecovery, platformModule.jobScheduler ); dependencies.satisfyDependencies( txPuller ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java index 727fb423b22e5..158488da7c982 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java @@ -24,6 +24,7 @@ import org.mockito.ArgumentCaptor; import java.io.File; +import java.util.concurrent.TimeUnit; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpResponseCallback; @@ -38,18 +39,25 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; +import org.neo4j.test.OnDemandJobScheduler; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT; +import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.txPolling; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; +import static org.neo4j.kernel.impl.util.JobScheduler.Groups.checkPoint; public class TxPollingClientTest { @@ -67,15 +75,17 @@ public class TxPollingClientTest private final CopiedStoreRecovery copiedStoreRecovery = mock( CopiedStoreRecovery.class ); private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); private final LocalDatabase localDatabase = mock( LocalDatabase.class ); + { when( localDatabase.storeId() ).thenReturn( storeId ); } private final Lifecycle startStopOnStoreCopy = mock( Lifecycle.class ); + private final OnDemandJobScheduler scheduler = spy( new OnDemandJobScheduler()); private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), fs, localDatabase, startStopOnStoreCopy, storeFetcher, - catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors(), - copiedStoreRecovery ); + catchUpClient, serverSelection, txPullIntervalMillis, txApplier, new Monitors(), + copiedStoreRecovery, scheduler ); @Before public void before() throws Throwable @@ -93,7 +103,7 @@ public void shouldSendPullRequestOnTick() throws Throwable when( txApplier.lastQueuedTxId() ).thenReturn( lastAppliedTxId ); // when - timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + scheduler.runJob(); // then verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), @@ -112,7 +122,7 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable any( CatchUpResponseCallback.class ) )) .thenReturn( CatchupResult.SUCCESS_END_OF_BATCH, CatchupResult.SUCCESS_END_OF_STREAM ); - timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + scheduler.runJob(); // then verify( catchUpClient, times( 2 ) ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), @@ -120,30 +130,19 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable } @Test - public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable + public void shouldRescheduleTheJobAfterARun() throws Throwable { - timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); - - StoreId storeId = new StoreId( 1, 2, 3, 4 ); - ArgumentCaptor captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class ); - - verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - captor.capture() ); - - captor.getValue().onTxPullResponse( null, - new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) ); - - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), times( 2 ) ).renew(); - } + // given + Runnable scheduledJob = scheduler.getJob(); + assertNotNull( scheduledJob ); - @Test - public void shouldRenewTxPullTimeoutOnTick() throws Throwable - { // when - timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + scheduler.runJob(); // then - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew(); + verify( scheduler, times( 2 ) ).schedule( eq( txPolling ), any( Runnable.class ), + eq( 100L ), eq( TimeUnit.MILLISECONDS ) ); + assertEquals( scheduledJob, scheduler.getJob() ); } @Test @@ -154,17 +153,14 @@ public void shouldCopyStoreIfCatchUpClientFails() throws Throwable any( CatchUpResponseCallback.class ) ) ).thenReturn( CatchupResult.E_TRANSACTION_PRUNED ); // when - timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + scheduler.runJob();; // then - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).cancel(); verify( localDatabase ).stop(); verify( startStopOnStoreCopy ).stop(); verify( storeFetcher ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) ); verify( localDatabase ).start(); verify( startStopOnStoreCopy ).start(); verify( txApplier ).refreshFromNewStore(); - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), - times( 2 ) /* at the beginning and after store copy */ ).renew(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java index e9b3d8b03e208..5f41ccecf8aea 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java @@ -70,7 +70,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database(); TxPollingClient pollingClient = readReplicaGraphDatabase.getDependencyResolver() .resolveDependency( TxPollingClient.class ); - pollingClient.pause(); +// pollingClient.pause(); cluster.coreTx( ( coreGraphDatabase, transaction ) -> { coreGraphDatabase.createNode(); @@ -92,7 +92,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th } // when the poller is resumed, it does make it to the read replica - pollingClient.resume(); +// pollingClient.resume(); transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) ); }