Skip to content

Commit

Permalink
Change state behaviour in PersistentDownloader
Browse files Browse the repository at this point in the history
Moved Stopped state to separate volatile variable keepRunning.

Make state variable volatile.

Remove isRunning method since it is only used in tests.

Synchronize method that move state from initial to running. This
is to make sure only one run is possible so that hasCompleted can be
trusted.
  • Loading branch information
RagnarW committed Dec 6, 2017
1 parent 77f78a4 commit 1e90845
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 78 deletions.
Expand Up @@ -38,7 +38,8 @@ class PersistentSnapshotDownloader implements Runnable
private final CoreStateDownloader downloader; private final CoreStateDownloader downloader;
private final Log log; private final Log log;
private final TimeoutStrategy.Timeout timeout; private final TimeoutStrategy.Timeout timeout;
private State state; private volatile State state;
private volatile boolean keepRunning;


PersistentSnapshotDownloader( LeaderLocator leaderLocator, PersistentSnapshotDownloader( LeaderLocator leaderLocator,
CommandApplicationProcess applicationProcess, CoreStateDownloader downloader, Log log, CommandApplicationProcess applicationProcess, CoreStateDownloader downloader, Log log,
Expand All @@ -50,6 +51,7 @@ class PersistentSnapshotDownloader implements Runnable
this.log = log; this.log = log;
this.timeout = timeout; this.timeout = timeout;
this.state = State.INITIATED; this.state = State.INITIATED;
this.keepRunning = true;
} }


PersistentSnapshotDownloader( LeaderLocator leaderLocator, PersistentSnapshotDownloader( LeaderLocator leaderLocator,
Expand All @@ -63,22 +65,21 @@ private enum State
{ {
INITIATED, INITIATED,
RUNNING, RUNNING,
STOPPED,
COMPLETED COMPLETED
} }


@Override @Override
public void run() public void run()
{ {
if ( !initialStateOk() ) if ( !moveToRunningState() )
{ {
return; return;
} }

try try
{ {
state = State.RUNNING;
applicationProcess.pauseApplier( OPERATION_NAME ); applicationProcess.pauseApplier( OPERATION_NAME );
while ( state == State.RUNNING ) while ( keepRunning )
{ {
try try
{ {
Expand Down Expand Up @@ -108,50 +109,26 @@ public void run()
} }
} }


private boolean initialStateOk() private synchronized boolean moveToRunningState()
{ {
switch ( state ) if ( state != State.INITIATED )
{ {
case INITIATED:
return true;
case RUNNING:
log.error( "Persistent snapshot downloader is already running. " +
"Illegal state '{}'. Expected '{}'", state, State.INITIATED );
return false;
case STOPPED:
log.info( "Persistent snapshot downloader was stopped before starting" );
return false;
case COMPLETED:
log.error( "Persistent snapshot downloader has already completed. " +
"Illegal state '{}'. Expected '{}'", state, State.INITIATED );
return false;
default:
log.error( "Not a recognised state. " +
"Illegal state '{}'. Expected '{}'", state, State.INITIATED );
return false; return false;
} }
} else

void stop()
{
if ( state == State.RUNNING )
{
state = State.STOPPED;
}
else if ( state == State.INITIATED )
{ {
state = State.COMPLETED; state = State.RUNNING;
return true;
} }
} }


boolean isRunning() void stop()
{ {
return state == State.RUNNING; this.keepRunning = false;
} }


boolean hasCompleted() boolean hasCompleted()
{ {
return state == State.COMPLETED; return state == State.COMPLETED;
} }

} }
Expand Up @@ -71,11 +71,11 @@ public void shouldPauseAndResumeApplicationProcessIfDownloadIsSuccessful() throw
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
verify( coreStateDownloader, times( 1 ) ).downloadSnapshot( any() ); verify( coreStateDownloader, times( 1 ) ).downloadSnapshot( any() );
assertFalse( persistentSnapshotDownloader.isRunning() ); assertTrue( persistentSnapshotDownloader.hasCompleted() );
} }


@Test @Test
public void shouldResumeCommandApplicationProcessIsInterruptedDownloadIsFailing() throws Exception public void shouldResumeCommandApplicationProcessIfInterrupted() throws Exception
{ {
// given // given
CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class );
Expand All @@ -94,20 +94,16 @@ public void shouldResumeCommandApplicationProcessIsInterruptedDownloadIsFailing(
Thread thread = new Thread( persistentSnapshotDownloader ); Thread thread = new Thread( persistentSnapshotDownloader );
thread.start(); thread.start();
awaitOneIteration( timeout ); awaitOneIteration( timeout );

// then
assertTrue( persistentSnapshotDownloader.isRunning() );

// when
thread.stop(); thread.stop();


// then // then
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
assertTrue( persistentSnapshotDownloader.hasCompleted() );
} }


@Test @Test
public void shouldResumeCommandApplicationProcessIfDownloadIsStopped() throws Exception public void shouldResumeCommandApplicationProcessIfDownloaderIsStopped() throws Exception
{ {
// given // given
CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class );
Expand All @@ -126,17 +122,13 @@ public void shouldResumeCommandApplicationProcessIfDownloadIsStopped() throws Ex
Thread thread = new Thread( persistentSnapshotDownloader ); Thread thread = new Thread( persistentSnapshotDownloader );
thread.start(); thread.start();
awaitOneIteration( timeout ); awaitOneIteration( timeout );

// then
assertTrue( persistentSnapshotDownloader.isRunning() );

// when
persistentSnapshotDownloader.stop(); persistentSnapshotDownloader.stop();
thread.join(); thread.join();


// then // then
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
assertTrue( persistentSnapshotDownloader.hasCompleted() );
} }


@Test @Test
Expand All @@ -161,11 +153,11 @@ public void shouldEventuallySucceed() throws Exception
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
assertEquals( 3, timeout.increments ); assertEquals( 3, timeout.increments );
assertFalse( persistentSnapshotDownloader.isRunning() ); assertTrue( persistentSnapshotDownloader.hasCompleted() );
} }


@Test @Test
public void shouldNotStartIfAlreadyCompleted() throws Exception public void shouldNotStartDownloadIfAlreadyCompleted() throws Exception
{ {
// given // given
CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class );
Expand All @@ -177,11 +169,12 @@ public void shouldNotStartIfAlreadyCompleted() throws Exception
PersistentSnapshotDownloader persistentSnapshotDownloader = PersistentSnapshotDownloader persistentSnapshotDownloader =
new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log );


// when
persistentSnapshotDownloader.run(); persistentSnapshotDownloader.run();
persistentSnapshotDownloader.run(); persistentSnapshotDownloader.run();


verify( log, times( 1 ) ) // then
.error( startsWith( "Persistent snapshot downloader has already completed." ), any(), any() ); verify( coreStateDownloader, times(1) ).downloadSnapshot( someMember );
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
} }
Expand Down Expand Up @@ -210,35 +203,11 @@ public void shouldNotStartIfCurrentlyRunning() throws Exception
persistentSnapshotDownloader.stop(); persistentSnapshotDownloader.stop();
thread.join(); thread.join();


verify( log, times( 1 ) ) // then
.error( startsWith( "Persistent snapshot downloader is already running." ), any(), any() );
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
} }


@Test
public void shouldNotStartIfStoppedBeforeRunning() throws Exception
{
// given
CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class );
final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class );
LeaderLocator leaderLocator = mock( LeaderLocator.class );
doThrow( NoLeaderFoundException.class ).when( leaderLocator ).getLeader();

final Log log = mock( Log.class );
PersistentSnapshotDownloader persistentSnapshotDownloader =
new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log );

// when
persistentSnapshotDownloader.stop();
persistentSnapshotDownloader.run();

verify( log, times( 1 ) )
.error( startsWith( "Persistent snapshot downloader has already completed.") , any(), any() );
verify( applicationProcess, never() ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, never() ).resumeApplier( OPERATION_NAME );
}

private void awaitOneIteration( NoTimeout timeout ) throws TimeoutException private void awaitOneIteration( NoTimeout timeout ) throws TimeoutException
{ {
Predicates.await( () -> timeout.increments > 0, 1, TimeUnit.SECONDS ); Predicates.await( () -> timeout.increments > 0, 1, TimeUnit.SECONDS );
Expand Down

0 comments on commit 1e90845

Please sign in to comment.