diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java index 4aa3974f79e82..9f4ec7b09d16e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreFetcher.java @@ -76,8 +76,7 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) try { log.info( "Copying store from %s", from ); - long lastFlushedTxId = - storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) ); + long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, expectedStoreId, new StreamToDisk( destDir, fs ) ); // We require at least one transaction for extracting the log index of the consensus log. // Given there might not have been any activity on the source server we need to ask for the diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java index 13642458ddd86..ef8a6b6648ac2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplier.java @@ -26,7 +26,6 @@ import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; -import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; @@ -43,7 +42,6 @@ public class BatchingTxApplier extends LifecycleAdapter private final int maxBatchSize; private final Supplier txIdStoreSupplier; private final Supplier commitProcessSupplier; - private final Supplier healthSupplier; private final PullRequestMonitor monitor; private final Log log; @@ -55,13 +53,12 @@ public class BatchingTxApplier extends LifecycleAdapter private volatile boolean stopped; public BatchingTxApplier( int maxBatchSize, Supplier txIdStoreSupplier, - Supplier commitProcessSupplier, Supplier healthSupplier, + Supplier commitProcessSupplier, Monitors monitors, LogProvider logProvider ) { this.maxBatchSize = maxBatchSize; this.txIdStoreSupplier = txIdStoreSupplier; this.commitProcessSupplier = commitProcessSupplier; - this.healthSupplier = healthSupplier; this.log = logProvider.getLog( getClass() ); this.monitor = monitors.newMonitor( PullRequestMonitor.class ); } @@ -71,8 +68,7 @@ public void start() { stopped = false; refreshFromNewStore(); - txQueue = - new TransactionQueue( maxBatchSize, ( first, last ) -> commitProcess.commit( first, NULL, EXTERNAL ) ); + txQueue = new TransactionQueue( maxBatchSize, ( first, last ) -> commitProcess.commit( first, NULL, EXTERNAL ) ); } @Override @@ -84,19 +80,8 @@ public void stop() void refreshFromNewStore() { assert txQueue == null || txQueue.isEmpty(); - resetLastQueuedTxId(); - commitProcess = commitProcessSupplier.get(); - } - - public void emptyQueueAndResetLastQueuedTxId() - { - applyBatch(); - resetLastQueuedTxId(); - } - - private void resetLastQueuedTxId() - { lastQueuedTxId = txIdStoreSupplier.get().getLastCommittedTransactionId(); + commitProcess = commitProcessSupplier.get(); } /** @@ -104,7 +89,7 @@ private void resetLastQueuedTxId() * * @param tx The transaction to be queued for application. */ - public void queue( CommittedTransactionRepresentation tx ) + public void queue( CommittedTransactionRepresentation tx ) throws Exception { long receivedTxId = tx.getCommitEntry().getTxId(); long expectedTxId = lastQueuedTxId + 1; @@ -115,15 +100,7 @@ public void queue( CommittedTransactionRepresentation tx ) return; } - try - { - txQueue.queue( new TransactionToApply( tx.getTransactionRepresentation(), receivedTxId ) ); - } - catch ( Exception e ) - { - log.error( "Error while queueing transaction", e ); - healthSupplier.get().panic( e ); - } + txQueue.queue( new TransactionToApply( tx.getTransactionRepresentation(), receivedTxId ) ); if ( !stopped ) { @@ -132,17 +109,9 @@ public void queue( CommittedTransactionRepresentation tx ) } } - void applyBatch() + void applyBatch() throws Exception { - try - { - txQueue.empty(); - } - catch ( Exception e ) - { - log.error( "Error during transaction application", e ); - healthSupplier.get().panic( e ); - } + txQueue.empty(); } /** diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java new file mode 100644 index 0000000000000..4664afd69f1db --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.java @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.catchup.tx; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import org.neo4j.causalclustering.catchup.CatchUpClient; +import org.neo4j.causalclustering.catchup.CatchUpClientException; +import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; +import org.neo4j.causalclustering.catchup.CatchupResult; +import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery; +import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; +import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; +import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher; +import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException; +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; +import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionException; +import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy; +import org.neo4j.causalclustering.readreplica.CopyStoreSafely; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.Lifecycle; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC; +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING; +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.TX_PULLING; +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.Timeouts.TX_PULLER_TIMEOUT; + +/** + * This class is responsible for pulling transactions from a core server and queuing + * them to be applied with the {@link BatchingTxApplier}. Pull requests are issued on + * a fixed interval. + * + * If the necessary transactions are not remotely available then a fresh copy of the + * entire store will be pulled down. + */ +public class CatchupPollingProcess extends LifecycleAdapter +{ + enum Timeouts implements TimeoutName + { + TX_PULLER_TIMEOUT + } + + enum State + { + TX_PULLING, + STORE_COPYING, + PANIC + } + + private final FileSystemAbstraction fs; + private final LocalDatabase localDatabase; + private final Log log; + private final Lifecycle startStopOnStoreCopy; + private final StoreFetcher storeFetcher; + private final CopiedStoreRecovery copiedStoreRecovery; + private final Supplier databaseHealthSupplier; + private final CatchUpClient catchUpClient; + private final CoreMemberSelectionStrategy connectionStrategy; + private final RenewableTimeoutService timeoutService; + private final long txPullIntervalMillis; + private final BatchingTxApplier applier; + private final PullRequestMonitor pullRequestMonitor; + + private RenewableTimeout timeout; + private State state = TX_PULLING; + private DatabaseHealth dbHealth; + + public CatchupPollingProcess( LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase, + Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, + CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService, + long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, + CopiedStoreRecovery copiedStoreRecovery, Supplier databaseHealthSupplier ) + { + this.fs = fs; + this.localDatabase = localDatabase; + this.log = logProvider.getLog( getClass() ); + this.startStopOnStoreCopy = startStopOnStoreCopy; + this.storeFetcher = storeFetcher; + this.catchUpClient = catchUpClient; + this.connectionStrategy = connectionStrategy; + this.timeoutService = timeoutService; + this.txPullIntervalMillis = txPullIntervalMillis; + this.applier = applier; + this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); + this.copiedStoreRecovery = copiedStoreRecovery; + this.databaseHealthSupplier = databaseHealthSupplier; + } + + @Override + public synchronized void start() throws Throwable + { + timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() ); + dbHealth = databaseHealthSupplier.get(); + } + + @Override + public void stop() throws Throwable + { + timeout.cancel(); + } + + public State state() + { + return state; + } + + /** + * Time to catchup! + */ + private void onTimeout() + { + try + { + switch ( state ) + { + case TX_PULLING: + pullTransactions(); + break; + + case STORE_COPYING: + copyStore(); + break; + + default: + throw new IllegalStateException( "Tried to execute catchup but was in state " + state ); + } + } + catch ( Throwable e ) + { + panic( e ); + } + + if ( state != PANIC ) + { + timeout.renew(); + } + } + + private synchronized void panic( Throwable e ) + { + log.error( "Unexpected issue in catchup process. No more catchup requests will be scheduled.", e ); + dbHealth.panic( e ); + state = PANIC; + } + + private void pullTransactions() + { + MemberId core; + try + { + core = connectionStrategy.coreMember(); + } + catch ( CoreMemberSelectionException e ) + { + log.warn( "Could not find core member to pull from", e ); + return; + } + + StoreId localStoreId = localDatabase.storeId(); + + boolean moreToPull = true; + int batchCount = 1; + while ( moreToPull ) + { + moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); + batchCount++; + } + } + + private synchronized void handleTransaction( CommittedTransactionRepresentation tx ) + { + if ( state == PANIC ) + { + return; + } + + try + { + applier.queue( tx ); + } + catch ( Throwable e ) + { + panic( e ); + } + } + + private synchronized void streamComplete() + { + if ( state == PANIC ) + { + return; + } + + try + { + applier.applyBatch(); + } + catch ( Throwable e ) + { + panic( e ); + } + } + + private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) + { + long lastQueuedTxId = applier.lastQueuedTxId(); + pullRequestMonitor.txPullRequest( lastQueuedTxId ); + TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId ); + log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount ); + + CatchupResult catchupResult; + try + { + catchupResult = catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor() + { + @Override + public void onTxPullResponse( CompletableFuture signal, TxPullResponse response ) + { + handleTransaction( response.tx() ); + } + + @Override + public void onTxStreamFinishedResponse( CompletableFuture signal, + TxStreamFinishedResponse response ) + { + streamComplete(); + signal.complete( response.status() ); + } + } ); + } + catch ( CatchUpClientException e ) + { + streamComplete(); + return false; + } + + switch ( catchupResult ) + { + case SUCCESS_END_OF_BATCH: + return true; + case SUCCESS_END_OF_STREAM: + log.debug( "Successfully pulled transactions from %d", lastQueuedTxId ); + return false; + case E_TRANSACTION_PRUNED: + log.info( "Tx pull unable to get transactions starting from %d since transactions " + + "have been pruned. Attempting a store copy.", lastQueuedTxId ) ; + state = STORE_COPYING; + return false; + default: + log.info( "Tx pull request unable to get transactions > %d " + lastQueuedTxId ); + return false; + } + } + + private void copyStore() + { + MemberId core; + try + { + core = connectionStrategy.coreMember(); + } + catch ( CoreMemberSelectionException e ) + { + log.warn( "Could not find core member from which to copy store", e ); + return; + } + + StoreId localStoreId = localDatabase.storeId(); + downloadDatabase( core, localStoreId ); + } + + private void downloadDatabase( MemberId core, StoreId localStoreId ) + { + try + { + localDatabase.stop(); + startStopOnStoreCopy.stop(); + } + catch ( Throwable throwable ) + { + throw new RuntimeException( throwable ); + } + + try + { + new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ). + copyWholeStoreFrom( core, localStoreId, storeFetcher ); + } + catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e ) + { + log.warn( String.format( "Error copying store from: %s. Will retry shortly.", core ) ); + return; + } + + try + { + localDatabase.start(); + startStopOnStoreCopy.start(); + } + catch ( Throwable throwable ) + { + throw new RuntimeException( throwable ); + } + + state = TX_PULLING; + applier.refreshFromNewStore(); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java deleted file mode 100644 index 5561d5579186e..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPollingClient.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.catchup.tx; - -import java.util.concurrent.CompletableFuture; - -import org.neo4j.causalclustering.catchup.CatchUpClient; -import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor; -import org.neo4j.causalclustering.catchup.CatchupResult; -import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery; -import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; -import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher; -import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService; -import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.RenewableTimeout; -import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService.TimeoutName; -import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.identity.StoreId; -import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionStrategy; -import org.neo4j.causalclustering.readreplica.CopyStoreSafely; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.lifecycle.Lifecycle; -import org.neo4j.kernel.lifecycle.LifecycleAdapter; -import org.neo4j.kernel.monitoring.Monitors; -import org.neo4j.logging.Log; -import org.neo4j.logging.LogProvider; - -import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT; - -/** - * This class is responsible for pulling transactions from a core server and queuing - * them to be applied with the {@link BatchingTxApplier}. - * - * Pull requests are issued on a fixed interval, but skipped if the {@link BatchingTxApplier} - * isn't yet finished with the current work. - */ -public class TxPollingClient extends LifecycleAdapter -{ - - enum Timeouts implements TimeoutName - { - TX_PULLER_TIMEOUT - } - - private final FileSystemAbstraction fs; - private final LocalDatabase localDatabase; - private final Log log; - private final Lifecycle startStopOnStoreCopy; - private final StoreFetcher storeFetcher; - private final CopiedStoreRecovery copiedStoreRecovery; - private final CatchUpClient catchUpClient; - private final CoreMemberSelectionStrategy connectionStrategy; - private final RenewableTimeoutService timeoutService; - private final long txPullIntervalMillis; - private final BatchingTxApplier applier; - private final PullRequestMonitor pullRequestMonitor; - - private RenewableTimeout timeout; - - public TxPollingClient( LogProvider logProvider, FileSystemAbstraction fs, LocalDatabase localDatabase, - Lifecycle startStopOnStoreCopy, StoreFetcher storeFetcher, CatchUpClient catchUpClient, - CoreMemberSelectionStrategy connectionStrategy, RenewableTimeoutService timeoutService, - long txPullIntervalMillis, BatchingTxApplier applier, Monitors monitors, - CopiedStoreRecovery copiedStoreRecovery ) - { - this.fs = fs; - this.localDatabase = localDatabase; - this.log = logProvider.getLog( getClass() ); - this.startStopOnStoreCopy = startStopOnStoreCopy; - this.storeFetcher = storeFetcher; - this.catchUpClient = catchUpClient; - this.connectionStrategy = connectionStrategy; - this.timeoutService = timeoutService; - this.txPullIntervalMillis = txPullIntervalMillis; - this.applier = applier; - this.pullRequestMonitor = monitors.newMonitor( PullRequestMonitor.class ); - this.copiedStoreRecovery = copiedStoreRecovery; - } - - @Override - public synchronized void start() throws Throwable - { - timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() ); - } - - /** - * Time to pull! - */ - private synchronized void onTimeout() - { - timeout.renew(); - applier.emptyQueueAndResetLastQueuedTxId(); - - try - { - MemberId core = connectionStrategy.coreMember(); - StoreId localStoreId = localDatabase.storeId(); - - boolean moreToPull = true; - int batchCount = 1; - while ( moreToPull ) - { - moreToPull = pullAndApplyBatchOfTransactions( core, localStoreId, batchCount ); - batchCount++; - } - } - catch ( Throwable e ) - { - log.warn( "Tx pull attempt failed, will retry at the next regularly scheduled polling attempt.", e ); - } - } - - private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localStoreId, int batchCount ) throws Throwable - { - long lastQueuedTxId = applier.lastQueuedTxId(); - pullRequestMonitor.txPullRequest( lastQueuedTxId ); - TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId ); - log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount ); - CatchupResult catchupResult = - catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor() - { - @Override - public void onTxPullResponse( CompletableFuture signal, TxPullResponse response ) - { - applier.queue( response.tx() ); - // no applying here, just put it in the batch - - timeout.renew(); - } - - @Override - public void onTxStreamFinishedResponse( CompletableFuture signal, - TxStreamFinishedResponse response ) - { - // apply the batch here - applier.applyBatch(); - signal.complete( response.status() ); - } - } ); - - switch ( catchupResult ) - { - case SUCCESS_END_OF_BATCH: - return true; - case SUCCESS_END_OF_STREAM: - log.debug( "Successfully pulled transactions from %d", lastQueuedTxId ); - return false; - case E_TRANSACTION_PRUNED: - log.info( "Tx pull unable to get transactions starting from %d since transactions " + - "have been pruned. Attempting a store copy.", lastQueuedTxId ) ; - downloadDatabase( core, localStoreId ); - return false; - default: - log.info( "Tx pull unable to get transactions > %d " + lastQueuedTxId ); - return false; - } - } - - private void downloadDatabase( MemberId core, StoreId localStoreId ) throws Throwable - { - pause(); - try - { - localDatabase.stop(); - startStopOnStoreCopy.stop(); - new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log ). - copyWholeStoreFrom( core, localStoreId, storeFetcher ); - localDatabase.start(); - startStopOnStoreCopy.start(); - applier.refreshFromNewStore(); - } - finally - { - resume(); - } - } - - public void pause() - { - timeout.cancel(); - } - - public void resume() - { - timeout.renew(); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index b273093dc8ad0..bbb6683e3918b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -33,7 +33,7 @@ import org.neo4j.causalclustering.catchup.storecopy.StoreFiles; import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; -import org.neo4j.causalclustering.catchup.tx.TxPollingClient; +import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService; @@ -188,7 +188,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke int maxBatchSize = config.get( CausalClusteringSettings.read_replica_transaction_applier_batch_size ); BatchingTxApplier batchingTxApplier = new BatchingTxApplier( maxBatchSize, dependencies.provideDependency( TransactionIdStore.class ), - writableCommitProcess, databaseHealthSupplier, platformModule.monitors, logProvider ); + writableCommitProcess, platformModule.monitors, logProvider ); DelayedRenewableTimeoutService txPullerTimeoutService = new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ); @@ -234,11 +234,11 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data } ); } - TxPollingClient txPuller = - new TxPollingClient( logProvider, fileSystem, localDatabase, servicesToStopOnStoreCopy, storeFetcher, + CatchupPollingProcess txPuller = + new CatchupPollingProcess( logProvider, fileSystem, localDatabase, servicesToStopOnStoreCopy, storeFetcher, catchUpClient, new ConnectToRandomCoreMember( discoveryService ), txPullerTimeoutService, config.get( CausalClusteringSettings.pull_interval ), batchingTxApplier, - platformModule.monitors, copiedStoreRecovery ); + platformModule.monitors, copiedStoreRecovery, databaseHealthSupplier ); dependencies.satisfyDependencies( txPuller ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplierTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplierTest.java index e221763c2eaf2..01c7ea010b786 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplierTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/BatchingTxApplierTest.java @@ -59,7 +59,7 @@ public class BatchingTxApplierTest private final int maxBatchSize = 16; private final BatchingTxApplier txApplier = new BatchingTxApplier( maxBatchSize, () -> idStore, () -> commitProcess, - () -> dbHealth, new Monitors(), NullLogProvider.getInstance() ); + new Monitors(), NullLogProvider.getInstance() ); @Before public void before() throws Throwable @@ -136,20 +136,6 @@ public void shouldBeAbleToQueueMaxBatchSize() throws Exception assertTransactionsCommitted( startTxId + 1, maxBatchSize ); } - @Test - public void shouldPanicIfTransactionFailsToApply() throws Throwable - { - // given - doThrow( Exception.class ).when( commitProcess ).commit( any(), any(), any() ); - txApplier.queue( createTxWithId( startTxId + 1 ) ); - - // when - txApplier.applyBatch(); - - // then - verify( dbHealth ).panic( any() ); - } - @Test( timeout = 3_000 ) public void shouldGiveUpQueueingOnStop() throws Throwable { @@ -167,7 +153,14 @@ public void shouldGiveUpQueueingOnStop() throws Throwable public void run() { latch.countDown(); - txApplier.queue( createTxWithId( startTxId + maxBatchSize + 1 ) ); + try + { + txApplier.queue( createTxWithId( startTxId + maxBatchSize + 1 ) ); + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } } }; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java similarity index 71% rename from enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java rename to enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java index 727fb423b22e5..16bef4c8e390f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPollingClientTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/CatchupPollingProcessTest.java @@ -24,6 +24,7 @@ import org.mockito.ArgumentCaptor; import java.io.File; +import java.util.concurrent.CompletableFuture; import org.neo4j.causalclustering.catchup.CatchUpClient; import org.neo4j.causalclustering.catchup.CatchUpResponseCallback; @@ -38,20 +39,29 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.neo4j.causalclustering.catchup.tx.TxPollingClient.Timeouts.TX_PULLER_TIMEOUT; + +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC; +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING; +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.TX_PULLING; +import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.Timeouts.TX_PULLER_TIMEOUT; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; -public class TxPollingClientTest +public class CatchupPollingProcessTest { private final CatchUpClient catchUpClient = mock( CatchUpClient.class ); private final CoreMemberSelectionStrategy serverSelection = mock( CoreMemberSelectionStrategy.class ); @@ -72,10 +82,10 @@ public class TxPollingClientTest } private final Lifecycle startStopOnStoreCopy = mock( Lifecycle.class ); - private final TxPollingClient txPuller = - new TxPollingClient( NullLogProvider.getInstance(), fs, localDatabase, startStopOnStoreCopy, storeFetcher, + private final CatchupPollingProcess txPuller = + new CatchupPollingProcess( NullLogProvider.getInstance(), fs, localDatabase, startStopOnStoreCopy, storeFetcher, catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors(), - copiedStoreRecovery ); + copiedStoreRecovery, () -> mock( DatabaseHealth.class) ); @Before public void before() throws Throwable @@ -120,51 +130,72 @@ public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable } @Test - public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable + public void shouldRenewTxPullTimeoutOnSuccessfulTxPulling() throws Throwable { - timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); - - StoreId storeId = new StoreId( 1, 2, 3, 4 ); - ArgumentCaptor captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class ); - - verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), - captor.capture() ); + // when + when( catchUpClient .makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), + any(CatchUpResponseCallback.class))) + .thenReturn( CatchupResult.SUCCESS_END_OF_STREAM ); - captor.getValue().onTxPullResponse( null, - new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) ); + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), times( 2 ) ).renew(); + // then + verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew(); } @Test - public void shouldRenewTxPullTimeoutOnTick() throws Throwable + public void nextStateShouldBeStoreCopyingIfRequestedTransactionHasBeenPrunedAway() throws Exception { // when + when( catchUpClient.makeBlockingRequest( + any( MemberId.class ), any( TxPullRequest.class ), any( CatchUpResponseCallback.class ) ) ) + .thenReturn( CatchupResult.E_TRANSACTION_PRUNED ); + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).renew(); + assertEquals( STORE_COPYING, txPuller.state() ); } @Test - public void shouldCopyStoreIfCatchUpClientFails() throws Throwable + public void nextStateShouldBeTxPullingAfterASuccessfulStoreCopy() throws Throwable { // given when( catchUpClient.makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), any( CatchUpResponseCallback.class ) ) ).thenReturn( CatchupResult.E_TRANSACTION_PRUNED ); - // when + // when (tx pull) + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + + // when (store copy) timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); // then - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ) ).cancel(); verify( localDatabase ).stop(); verify( startStopOnStoreCopy ).stop(); verify( storeFetcher ).copyStore( any( MemberId.class ), eq( storeId ), any( File.class ) ); verify( localDatabase ).start(); verify( startStopOnStoreCopy ).start(); verify( txApplier ).refreshFromNewStore(); - verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), - times( 2 ) /* at the beginning and after store copy */ ).renew(); + + // then + assertEquals( TX_PULLING, txPuller.state() ); + } + + @Test + public void shouldNotRenewTheTimeoutIfInPanicState() throws Exception + { + // given + CatchUpResponseCallback callback = mock( CatchUpResponseCallback.class ); + + doThrow( new RuntimeException( "Panic all the things" ) ).when( callback ) + .onTxPullResponse( any( CompletableFuture.class ), any( TxPullResponse.class ) ); + + // when + timeoutService.invokeTimeout( TX_PULLER_TIMEOUT ); + + // then + assertEquals( PANIC, txPuller.state() ); + verify( timeoutService.getTimeout( TX_PULLER_TIMEOUT ), never() ).renew(); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java index ad992d5e2d652..da04b43cf067e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.function.IntFunction; -import org.neo4j.causalclustering.catchup.tx.TxPollingClient; +import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase; import org.neo4j.graphdb.factory.GraphDatabaseSettings; @@ -118,9 +118,9 @@ public void shutdown() database = null; } - public TxPollingClient txPollingClient() + public CatchupPollingProcess txPollingClient() { - return database.getDependencyResolver().resolveDependency( TxPollingClient.class ); + return database.getDependencyResolver().resolveDependency( CatchupPollingProcess.class ); } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java index e9b3d8b03e208..80e5d1302518b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CausalConsistencyIT.java @@ -22,7 +22,7 @@ import org.junit.Rule; import org.junit.Test; -import org.neo4j.causalclustering.catchup.tx.TxPollingClient; +import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.core.CoreGraphDatabase; import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase; @@ -62,15 +62,15 @@ public void transactionsCommittedInTheCoreShouldAppearOnTheReadReplica() throws } @Test - public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() throws Exception + public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() throws Throwable { // given Cluster cluster = clusterRule.startCluster(); ReadReplicaGraphDatabase readReplicaGraphDatabase = cluster.findAnyReadReplica().database(); - TxPollingClient pollingClient = readReplicaGraphDatabase.getDependencyResolver() - .resolveDependency( TxPollingClient.class ); - pollingClient.pause(); + CatchupPollingProcess pollingClient = readReplicaGraphDatabase.getDependencyResolver() + .resolveDependency( CatchupPollingProcess.class ); + pollingClient.stop(); cluster.coreTx( ( coreGraphDatabase, transaction ) -> { coreGraphDatabase.createNode(); @@ -92,7 +92,7 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th } // when the poller is resumed, it does make it to the read replica - pollingClient.resume(); + pollingClient.start(); transactionIdTracker( readReplicaGraphDatabase ).awaitUpToDate( transactionVisibleOnLeader, ofSeconds( 3 ) ); } diff --git a/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java b/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java index 3969731480182..02980fc705c99 100644 --- a/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java +++ b/integrationtests/src/test/java/org/neo4j/causalclustering/scenarios/BoltCausalClusteringIT.java @@ -525,7 +525,7 @@ public void shouldUseBookmarkFromAReadSessionInAWriteSession() throws Exception } @Test - public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception + public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Throwable { // given cluster = clusterRule.withNumberOfReadReplicas( 1 ).startCluster(); @@ -533,7 +533,7 @@ public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception CoreClusterMember leader = cluster.awaitLeader(); ReadReplica readReplica = cluster.getReadReplicaById( 0 ); - readReplica.txPollingClient().pause(); + readReplica.txPollingClient().stop(); Driver driver = GraphDatabase.driver( leader.directURI(), AuthTokens.basic( "neo4j", "neo4j" ) ); @@ -552,7 +552,7 @@ public void shouldUseBookmarkFromAWriteSessionInAReadSession() throws Exception } ); assertNotNull( bookmark ); - readReplica.txPollingClient().resume(); + readReplica.txPollingClient().start(); driver = GraphDatabase.driver( readReplica.directURI(), AuthTokens.basic( "neo4j", "neo4j" ) );