From 719efaf42177eb5e20948d4650a0402093b61197 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Thu, 30 Nov 2017 14:57:54 +0100 Subject: [PATCH] Core state downloader will now retry indefinitely When the raft message handler need a new snapshot it will schedule this on the downloader service. This will pause the application process until download is successful. --- .../core/server/CoreServerModule.java | 8 +- .../core/state/RaftMessageHandler.java | 29 +-- .../state/snapshot/CoreStateDownloader.java | 15 +- .../snapshot/CoreStateDownloaderService.java | 66 +++++ .../PersistentSnapshotDownloader.java | 115 +++++++++ .../CoreStateDownloaderServiceTest.java | 174 ++++++++++++++ .../snapshot/CoreStateDownloaderTest.java | 3 +- .../PersistentSnapshotDownloaderTest.java | 227 ++++++++++++++++++ 8 files changed, 596 insertions(+), 41 deletions(-) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index aa39967d01850..d317503e9550c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -55,6 +55,7 @@ import org.neo4j.causalclustering.core.state.RaftMessageHandler; import org.neo4j.causalclustering.core.state.machines.CoreStateMachinesModule; import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloader; +import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService; import org.neo4j.causalclustering.core.state.storage.DurableStateStorage; import org.neo4j.causalclustering.core.state.storage.StateStorage; import org.neo4j.causalclustering.discovery.TopologyService; @@ -177,10 +178,13 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, servicesToStopOnStoreCopy, remoteStore, catchUpClient, logProvider, storeCopyProcess, coreStateMachinesModule.coreStateMachines, - snapshotService, commandApplicationProcess, topologyService ); + snapshotService, topologyService ); + + CoreStateDownloaderService downloadService = new CoreStateDownloaderService( platformModule + .jobScheduler, downloader, commandApplicationProcess, logProvider ); RaftMessageHandler messageHandler = new RaftMessageHandler( localDatabase, logProvider, - consensusModule.raftMachine(), downloader, commandApplicationProcess ); + consensusModule.raftMachine(), downloadService, commandApplicationProcess ); int queueSize = config.get( CausalClusteringSettings.raft_in_queue_size ); int maxBatch = config.get( CausalClusteringSettings.raft_in_queue_max_batch ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageHandler.java index 96f149fa3a228..ca280d0b24aaf 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageHandler.java @@ -22,13 +22,11 @@ import java.util.concurrent.TimeoutException; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; -import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome; -import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloader; +import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService; import org.neo4j.causalclustering.identity.ClusterId; -import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.Inbound; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -38,19 +36,19 @@ public class RaftMessageHandler implements Inbound.MessageHandler. + */ +package org.neo4j.causalclustering.core.state.snapshot; + +import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.state.CommandApplicationProcess; +import org.neo4j.kernel.impl.util.JobScheduler; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; + +public class CoreStateDownloaderService +{ + static final String OPERATION_NAME = "download of snapshot"; + + private final JobScheduler jobScheduler; + private final CoreStateDownloader downloader; + private final CommandApplicationProcess applicationProcess; + private final Log log; + private PersistentSnapshotDownloader currentJob = null; + private final JobScheduler.Group downloadSnapshotGroup; + + public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader, + CommandApplicationProcess applicationProcess, + LogProvider logProvider ) + { + this.jobScheduler = jobScheduler; + this.downloader = downloader; + this.applicationProcess = applicationProcess; + this.log = logProvider.getLog( getClass() ); + this.downloadSnapshotGroup = new JobScheduler.Group( "download snapshot", POOLED ); + } + + public void scheduleDownload( LeaderLocator leaderLocator ) + { + if ( currentJob == null || currentJob.hasCompleted() ) + { + synchronized ( this ) + { + if ( currentJob == null || currentJob.hasCompleted() ) + { + currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log ); + jobScheduler.schedule( downloadSnapshotGroup, currentJob ); + } + } + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java new file mode 100644 index 0000000000000..ce7047e99b2a9 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.state.snapshot; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; +import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.core.state.CommandApplicationProcess; +import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; +import org.neo4j.causalclustering.helper.TimeoutStrategy; +import org.neo4j.logging.Log; + +class PersistentSnapshotDownloader implements Runnable +{ + private final CommandApplicationProcess applicationProcess; + private final LeaderLocator leaderLocator; + private final CoreStateDownloader downloader; + private final Log log; + private final TimeoutStrategy.Timeout timeout; + private State state; + + PersistentSnapshotDownloader( LeaderLocator leaderLocator, + CommandApplicationProcess applicationProcess, CoreStateDownloader downloader, Log log, + TimeoutStrategy.Timeout timeout ) + { + this.applicationProcess = applicationProcess; + this.leaderLocator = leaderLocator; + this.downloader = downloader; + this.log = log; + this.timeout = timeout; + this.state = State.INITIATED; + } + + PersistentSnapshotDownloader( LeaderLocator leaderLocator, + CommandApplicationProcess applicationProcess, CoreStateDownloader downloader, Log log ) + { + this( leaderLocator, applicationProcess, downloader, log, + new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS ).newTimeout() ); + } + + private enum State + { + INITIATED, + RUNNING, + COMPLETED + } + + @Override + public void run() + { + state = State.RUNNING; + try + { + applicationProcess.pauseApplier( CoreStateDownloaderService.OPERATION_NAME ); + while ( true ) + { + if ( Thread.interrupted() ) + { + break; + } + try + { + downloader.downloadSnapshot( leaderLocator.getLeader() ); + applicationProcess.resumeApplier( CoreStateDownloaderService.OPERATION_NAME ); + break; + } + catch ( StoreCopyFailedException e ) + { + log.error( "Failed to download snapshot. Retrying in {} ms.", timeout.getMillis(), e ); + } + catch ( NoLeaderFoundException e ) + { + log.warn( "No leader found. Retrying in {} ms.", timeout.getMillis() ); + } + LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( timeout.getMillis() ) ); + timeout.increment(); + } + } + finally + { + state = State.COMPLETED; + } + } + + boolean isRunning() + { + return state == State.RUNNING; + } + + boolean hasCompleted() + { + return state == State.COMPLETED; + } + +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java new file mode 100644 index 0000000000000..4d6d849e02371 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.state.snapshot; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.core.state.CommandApplicationProcess; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.function.Predicates; +import org.neo4j.kernel.impl.util.CountingJobScheduler; +import org.neo4j.kernel.impl.util.Listener; +import org.neo4j.kernel.impl.util.Neo4jJobScheduler; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService.OPERATION_NAME; + +public class CoreStateDownloaderServiceTest +{ + private final MemberId someMember = new MemberId( UUID.randomUUID() ); + private Neo4jJobScheduler neo4jJobScheduler; + + @Before + public void create() + { + neo4jJobScheduler = new Neo4jJobScheduler(); + neo4jJobScheduler.init(); + } + + @After + public void shutdown() + { + neo4jJobScheduler.shutdown(); + } + + @Test + public void shouldRunPersistentDownloader() throws Exception + { + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + + final Log log = mock( Log.class ); + CoreStateDownloaderService coreStateDownloaderService = + new CoreStateDownloaderService( neo4jJobScheduler, coreStateDownloader, applicationProcess, + logProvider( log ) ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenReturn( someMember ); + coreStateDownloaderService.scheduleDownload( leaderLocator ); + Predicates.await( () -> + { + try + { + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + return true; + } + catch ( Throwable t ) + { + return false; + } + }, 1, TimeUnit.SECONDS ); + + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + verify( coreStateDownloader, times( 1 ) ).downloadSnapshot( any() ); + } + + @Test + public void shouldOnlyScheduleOnePersistentDownloaderTaskAtTheTime() throws Exception + { + AtomicInteger schedules = new AtomicInteger( ); + CountingJobScheduler countingJobScheduler = new CountingJobScheduler( schedules, neo4jJobScheduler ); + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + + final Log log = mock( Log.class ); + CoreStateDownloaderService coreStateDownloaderService = + new CoreStateDownloaderService( countingJobScheduler, coreStateDownloader, applicationProcess, + logProvider( log ) ); + + AtomicBoolean availableLeader = new AtomicBoolean( false ); + + LeaderLocator leaderLocator = new ControllableLeaderLocator( availableLeader ); + coreStateDownloaderService.scheduleDownload( leaderLocator ); + coreStateDownloaderService.scheduleDownload( leaderLocator ); + coreStateDownloaderService.scheduleDownload( leaderLocator ); + coreStateDownloaderService.scheduleDownload( leaderLocator ); + + availableLeader.set( true ); + + assertEquals(1, schedules.get()); + } + + private class ControllableLeaderLocator implements LeaderLocator + { + private final AtomicBoolean shouldProvideALeader; + + ControllableLeaderLocator( AtomicBoolean shouldProvideALeader ) + { + this.shouldProvideALeader = shouldProvideALeader; + } + + @Override + public MemberId getLeader() throws NoLeaderFoundException + { + if ( shouldProvideALeader.get() ) + { + return someMember; + } + throw new NoLeaderFoundException( "sorry" ); + } + + @Override + public void registerListener( Listener listener ) + { + // do nothing + } + + @Override + public void unregisterListener( Listener listener ) + { + // do nothing + } + } + + private LogProvider logProvider( Log log ) + { + return new LogProvider() + { + @Override + public Log getLog( Class loggingClass ) + { + return log; + } + + @Override + public Log getLog( String name ) + { + return log; + } + }; + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java index d87cc6a69cdf7..3ac0ceae7eee2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java @@ -59,7 +59,6 @@ public class CoreStateDownloaderTest private final CatchUpClient catchUpClient = mock( CatchUpClient.class ); private final StoreCopyProcess storeCopyProcess = mock( StoreCopyProcess.class ); private CoreSnapshotService snaptshotService = mock( CoreSnapshotService.class ); - private CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); private TopologyService topologyService = mock( TopologyService.class ); private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); @@ -73,7 +72,7 @@ public class CoreStateDownloaderTest private final CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, startStopLife, remoteStore, catchUpClient, logProvider, - storeCopyProcess, coreStateMachines, snaptshotService, applicationProcess, topologyService ); + storeCopyProcess, coreStateMachines, snaptshotService, topologyService ); @Before public void commonMocking() throws IOException diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java new file mode 100644 index 0000000000000..3c2e3f91edccc --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.state.snapshot; + +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; +import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; +import org.neo4j.causalclustering.core.state.CommandApplicationProcess; +import org.neo4j.causalclustering.helper.TimeoutStrategy; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.function.Predicates; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.startsWith; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService.OPERATION_NAME; + +public class PersistentSnapshotDownloaderTest +{ + private final MemberId someMember = new MemberId( UUID.randomUUID() ); + + @Test + public void shouldPauseAndResumeApplicationProcessIfDownloadIsSuccessful() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + final Log log = mock( Log.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenReturn( someMember ); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + + // when + persistentSnapshotDownloader.run(); + + // then + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + verify( coreStateDownloader, times( 1 ) ).downloadSnapshot( any() ); + assertFalse( persistentSnapshotDownloader.isRunning() ); + } + + @Test + public void shouldNotResumeCommandApplicationProcessWhileDownloadIsFailing() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + doThrow( StoreCopyFailedException.class ).when( coreStateDownloader ).downloadSnapshot( someMember ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenReturn( someMember ); + + final Log log = mock( Log.class ); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + + // when + Thread thread = new Thread( persistentSnapshotDownloader ); + thread.start(); + + Predicates.await( () -> + { + try + { + verify( log, atLeast( 1 ) ).error( startsWith( "Failed to download snapshot. Retrying in" ) + , anyInt(), any( StoreCopyFailedException.class ) ); + return true; + } + catch ( Throwable throwable ) + { + return false; + } + }, 1, TimeUnit.SECONDS ); + + // then + assertTrue( persistentSnapshotDownloader.isRunning() ); + + // when + thread.stop(); + + // then + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, never() ).resumeApplier( OPERATION_NAME ); + } + + @Test + public void shouldNotResumeCommandApplicationProcessIfNoLeaderIsFound() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + doThrow( NoLeaderFoundException.class ).when( leaderLocator ).getLeader(); + + final Log log = mock( Log.class ); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + + // when + Thread thread = new Thread( persistentSnapshotDownloader ); + thread.start(); + + Predicates.await( () -> + { + try + { + verify( log, atLeast( 1 ) ).warn( + startsWith( "No leader found. Retrying in" ), + anyInt() ); + return true; + } + catch ( Throwable throwable ) + { + return false; + } + }, 1, TimeUnit.SECONDS ); + + // then + assertTrue( persistentSnapshotDownloader.isRunning() ); + + // when + thread.stop(); + + // then + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, never() ).resumeApplier( OPERATION_NAME ); + } + + @Test + public void shouldEventuallySucceed() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = new EventuallySuccessfulDownloader( 3 ); + + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenReturn( someMember ); + final Log log = mock( Log.class ); + NoTimeout timeout = new NoTimeout(); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log, + timeout ); + + // when + persistentSnapshotDownloader.run(); + + // then + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + assertEquals( 3, timeout.increments ); + assertFalse(persistentSnapshotDownloader.isRunning()); + } + + private class EventuallySuccessfulDownloader extends CoreStateDownloader + { + private int after; + + private EventuallySuccessfulDownloader( int after ) + { + super( null, null, null, + null, NullLogProvider.getInstance(), null, null, + null, null ); + this.after = after; + } + + @Override + void downloadSnapshot( MemberId source ) throws StoreCopyFailedException + { + if ( after-- > 0 ) + { + throw new StoreCopyFailedException( "sorry" ); + } + } + } + + private class NoTimeout implements TimeoutStrategy.Timeout + { + private int increments = 0; + + @Override + public long getMillis() + { + return 0; + } + + @Override + public void increment() + { + increments++; + } + } +}