From da3a460a7e4481534a8e19b73b0c2c6ede973ae8 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Mon, 8 Oct 2018 16:24:47 +0200 Subject: [PATCH] Assert local database is healthy in RaftReplicator This ensures that the replication is aborted if database is not healthy. --- .../causalclustering/ReplicationModule.java | 7 +- .../core/EnterpriseCoreEditionModule.java | 2 +- .../core/replication/RaftReplicator.java | 6 +- .../ReplicationFailureException.java | 4 +- .../core/replication/RaftReplicatorTest.java | 109 +++++++++++++----- 5 files changed, 91 insertions(+), 37 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index e3eec8c42385..5f2779f91bff 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -26,6 +26,7 @@ import java.time.Duration; import java.util.UUID; +import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.RaftMessages; @@ -55,8 +56,8 @@ public class ReplicationModule private final SessionTracker sessionTracker; public ReplicationModule( RaftMachine raftMachine, MemberId myself, PlatformModule platformModule, Config config, - Outbound outbound, - File clusterStateDirectory, FileSystemAbstraction fileSystem, LogProvider logProvider, AvailabilityGuard globalAvailabilityGuard ) + Outbound outbound, File clusterStateDirectory, FileSystemAbstraction fileSystem, LogProvider logProvider, + AvailabilityGuard globalAvailabilityGuard, LocalDatabase localDatabase ) { LifeSupport life = platformModule.life; @@ -82,7 +83,7 @@ public ReplicationModule( RaftMachine raftMachine, MemberId myself, PlatformModu outbound, sessionPool, progressTracker, progressRetryStrategy, availabilityTimeoutMillis, - globalAvailabilityGuard, logProvider, + globalAvailabilityGuard, logProvider, localDatabase, platformModule.monitors ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index bcd101d7f509..1483be65c6a8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -299,7 +299,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, dependencies.satisfyDependency( consensusModule.raftMachine() ); replicationModule = new ReplicationModule( consensusModule.raftMachine(), identityModule.myself(), platformModule, config, loggingOutbound, - clusterStateDirectory.get(), fileSystem, logProvider, globalGuard ); + clusterStateDirectory.get(), fileSystem, logProvider, globalGuard, localDatabase ); coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(), platformModule, clusterStateDirectory.get(), config, replicationModule.getReplicator(), diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 9484a44aa68c..ab11b0e2d5fa 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -25,6 +25,7 @@ import java.util.concurrent.Future; import java.util.function.BiConsumer; +import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.LeaderLocator; @@ -53,13 +54,14 @@ public class RaftReplicator implements Replicator, LeaderListener private final TimeoutStrategy progressTimeoutStrategy; private final AvailabilityGuard availabilityGuard; private final Log log; + private final LocalDatabase localDatabase; private final ReplicationMonitor replicationMonitor; private final long availabilityTimeoutMillis; private final LeaderProvider leaderProvider; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, long availabilityTimeoutMillis, AvailabilityGuard availabilityGuard, - LogProvider logProvider, Monitors monitors ) + LogProvider logProvider, LocalDatabase localDatabase, Monitors monitors ) { this.me = me; this.outbound = outbound; @@ -69,6 +71,7 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound databaseHealth, databaseAvailabilityGuard ); + localDatabase.start(); + } @Test - public void shouldSendReplicatedContentToLeader() throws Exception + void shouldSendReplicatedContentToLeader() throws Exception { // given Monitors monitors = new Monitors(); @@ -118,7 +135,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception } @Test - public void shouldResendAfterTimeout() throws Exception + void shouldResendAfterTimeout() throws Exception { // given Monitors monitors = new Monitors(); @@ -149,7 +166,7 @@ public void shouldResendAfterTimeout() throws Exception } @Test - public void shouldReleaseSessionWhenFinished() throws Exception + void shouldReleaseSessionWhenFinished() throws Exception { // given CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); @@ -178,7 +195,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception } @Test - public void stopReplicationOnShutdown() throws InterruptedException + void stopReplicationOnShutdown() throws InterruptedException { // given Monitors monitors = new Monitors(); @@ -206,7 +223,7 @@ public void stopReplicationOnShutdown() throws InterruptedException } @Test - public void stopReplicationWhenUnavailable() throws InterruptedException + void stopReplicationWhenUnavailable() throws InterruptedException { CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); @@ -226,29 +243,41 @@ public void stopReplicationWhenUnavailable() throws InterruptedException } @Test - public void shouldFailIfNoLeaderIsAvailable() + void stopReplicationWhenUnHealthy() throws InterruptedException + { + CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); + CapturingOutbound outbound = new CapturingOutbound<>(); + + RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() ); + replicator.onLeaderSwitch( leaderInfo ); + + ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); + ReplicatingThread replicatingThread = replicatingThread( replicator, content, true ); + + // when + replicatingThread.start(); + + databaseHealth.panic( new IllegalStateException( "PANIC" ) ); + replicatingThread.join(); + Assertions.assertNotNull( replicatingThread.getReplicationException() ); + } + + @Test + void shouldFailIfNoLeaderIsAvailable() { // given CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() ); + ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); // when - try - { - ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); - replicator.replicate( content, true ); - fail( "should have thrown" ); - } - catch ( ReplicationFailureException ignored ) - { - // expected - } + assertThrows( ReplicationFailureException.class, () -> replicator.replicate( content, true ) ); } @Test - public void shouldListenToLeaderUpdates() throws ReplicationFailureException + void shouldListenToLeaderUpdates() throws ReplicationFailureException { OneProgressTracker oneProgressTracker = new OneProgressTracker(); oneProgressTracker.last.setReplicated(); @@ -271,7 +300,7 @@ public void shouldListenToLeaderUpdates() throws ReplicationFailureException } @Test - public void shouldSuccefulltSendIfLeaderIsLostAndFound() throws InterruptedException + void shouldSuccessfullySendIfLeaderIsLostAndFound() throws InterruptedException { OneProgressTracker capturedProgress = new OneProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); @@ -297,7 +326,7 @@ public void shouldSuccefulltSendIfLeaderIsLostAndFound() throws InterruptedExcep private RaftReplicator getReplicator( CapturingOutbound outbound, ProgressTracker progressTracker, Monitors monitors ) { return new RaftReplicator( leaderLocator, myself, outbound, sessionPool, progressTracker, noWaitTimeoutStrategy, 10, databaseAvailabilityGuard, - NullLogProvider.getInstance(), monitors ); + NullLogProvider.getInstance(), localDatabase, monitors ); } private ReplicatingThread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) @@ -423,4 +452,22 @@ public void send( MemberId to, MESSAGE message, boolean block ) } } + + private static class StubLocalDatabase extends LocalDatabase + { + static LocalDatabase create( Supplier databaseHealthSupplier, AvailabilityGuard availabilityGuard ) throws IOException + { + StoreFiles storeFiles = mock( StoreFiles.class ); + when( storeFiles.readStoreId( any() ) ).thenReturn( new StoreId( 1, 2, 3, 4 ) ); + + DataSourceManager dataSourceManager = mock( DataSourceManager.class ); + return new StubLocalDatabase( storeFiles, dataSourceManager, databaseHealthSupplier, availabilityGuard ); + } + + StubLocalDatabase( StoreFiles storeFiles, DataSourceManager dataSourceManager, Supplier databaseHealthSupplier, + AvailabilityGuard availabilityGuard ) + { + super( null, storeFiles, null, dataSourceManager, databaseHealthSupplier, availabilityGuard, NullLogProvider.getInstance() ); + } + } }