Skip to content

Commit

Permalink
Respond to feedback from comments
Browse files Browse the repository at this point in the history
1. Application process always resume.
2. Made CoreStateDownloaderService a LifecycleApplication
3. Added STOPPED state in PersistentSnapshotDownloader. This is called
when LifecycleAdaptor is stopped.
4. Do state check before running. Each PersistentSnapshotDownloader
has run one time only logic.
  • Loading branch information
RagnarW committed Dec 5, 2017
1 parent 719efaf commit 77f78a4
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 80 deletions.
Expand Up @@ -157,6 +157,11 @@ class Groups
*/ */
public static final Group metricsEvent = new Group( "MetricsEvent", POOLED ); public static final Group metricsEvent = new Group( "MetricsEvent", POOLED );


/**
* Snapshot downloader
*/
public static final Group downloadSnapshot = new JobScheduler.Group( "DownloadSnapshot", POOLED );

/** /**
* UDC timed events. * UDC timed events.
*/ */
Expand Down
Expand Up @@ -236,5 +236,6 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
life.add( raftServer ); // must start before core state so that it can trigger snapshot downloads when necessary life.add( raftServer ); // must start before core state so that it can trigger snapshot downloads when necessary
life.add( coreLife ); life.add( coreLife );
life.add( catchupServer ); // must start last and stop first, since it handles external requests life.add( catchupServer ); // must start last and stop first, since it handles external requests
life.add( downloadService );
} }
} }
Expand Up @@ -22,21 +22,19 @@
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; import static org.neo4j.kernel.impl.util.JobScheduler.Groups.downloadSnapshot;


public class CoreStateDownloaderService public class CoreStateDownloaderService extends LifecycleAdapter
{ {
static final String OPERATION_NAME = "download of snapshot";

private final JobScheduler jobScheduler; private final JobScheduler jobScheduler;
private final CoreStateDownloader downloader; private final CoreStateDownloader downloader;
private final CommandApplicationProcess applicationProcess; private final CommandApplicationProcess applicationProcess;
private final Log log; private final Log log;
private PersistentSnapshotDownloader currentJob = null; private PersistentSnapshotDownloader currentJob = null;
private final JobScheduler.Group downloadSnapshotGroup;


public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader, public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader,
CommandApplicationProcess applicationProcess, CommandApplicationProcess applicationProcess,
Expand All @@ -46,21 +44,23 @@ public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloade
this.downloader = downloader; this.downloader = downloader;
this.applicationProcess = applicationProcess; this.applicationProcess = applicationProcess;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.downloadSnapshotGroup = new JobScheduler.Group( "download snapshot", POOLED );
} }


public void scheduleDownload( LeaderLocator leaderLocator ) public synchronized void scheduleDownload( LeaderLocator leaderLocator )
{ {
if ( currentJob == null || currentJob.hasCompleted() ) if ( currentJob == null || currentJob.hasCompleted() )
{ {
synchronized ( this ) currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log );
{ jobScheduler.schedule( downloadSnapshot, currentJob );
if ( currentJob == null || currentJob.hasCompleted() ) }
{ }
currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log );
jobScheduler.schedule( downloadSnapshotGroup, currentJob ); @Override
} public void stop() throws Throwable
} {
if (currentJob != null)
{
currentJob.stop();
} }
} }
} }
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.causalclustering.core.state.snapshot; package org.neo4j.causalclustering.core.state.snapshot;


import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;


import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
Expand All @@ -32,6 +31,8 @@


class PersistentSnapshotDownloader implements Runnable class PersistentSnapshotDownloader implements Runnable
{ {
static final String OPERATION_NAME = "download of snapshot";

private final CommandApplicationProcess applicationProcess; private final CommandApplicationProcess applicationProcess;
private final LeaderLocator leaderLocator; private final LeaderLocator leaderLocator;
private final CoreStateDownloader downloader; private final CoreStateDownloader downloader;
Expand Down Expand Up @@ -62,26 +63,26 @@ private enum State
{ {
INITIATED, INITIATED,
RUNNING, RUNNING,
STOPPED,
COMPLETED COMPLETED
} }


@Override @Override
public void run() public void run()
{ {
state = State.RUNNING; if ( !initialStateOk() )
{
return;
}
try try
{ {
applicationProcess.pauseApplier( CoreStateDownloaderService.OPERATION_NAME ); state = State.RUNNING;
while ( true ) applicationProcess.pauseApplier( OPERATION_NAME );
while ( state == State.RUNNING )
{ {
if ( Thread.interrupted() )
{
break;
}
try try
{ {
downloader.downloadSnapshot( leaderLocator.getLeader() ); downloader.downloadSnapshot( leaderLocator.getLeader() );
applicationProcess.resumeApplier( CoreStateDownloaderService.OPERATION_NAME );
break; break;
} }
catch ( StoreCopyFailedException e ) catch ( StoreCopyFailedException e )
Expand All @@ -92,11 +93,52 @@ public void run()
{ {
log.warn( "No leader found. Retrying in {} ms.", timeout.getMillis() ); log.warn( "No leader found. Retrying in {} ms.", timeout.getMillis() );
} }
LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( timeout.getMillis() ) ); Thread.sleep( timeout.getMillis() );
timeout.increment(); timeout.increment();
} }
} }
catch ( InterruptedException e )
{
log.error( "Persistent snapshot downloader was interrupted" );
}
finally finally
{
applicationProcess.resumeApplier( OPERATION_NAME );
state = State.COMPLETED;
}
}

private boolean initialStateOk()
{
switch ( state )
{
case INITIATED:
return true;
case RUNNING:
log.error( "Persistent snapshot downloader is already running. " +
"Illegal state '{}'. Expected '{}'", state, State.INITIATED );
return false;
case STOPPED:
log.info( "Persistent snapshot downloader was stopped before starting" );
return false;
case COMPLETED:
log.error( "Persistent snapshot downloader has already completed. " +
"Illegal state '{}'. Expected '{}'", state, State.INITIATED );
return false;
default:
log.error( "Not a recognised state. " +
"Illegal state '{}'. Expected '{}'", state, State.INITIATED );
return false;
}
}

void stop()
{
if ( state == State.RUNNING )
{
state = State.STOPPED;
}
else if ( state == State.INITIATED )
{ {
state = State.COMPLETED; state = State.COMPLETED;
} }
Expand Down
Expand Up @@ -25,9 +25,11 @@


import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
Expand All @@ -41,11 +43,13 @@


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.mockito.internal.verification.VerificationModeFactory.times;
import static org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService.OPERATION_NAME; import static org.neo4j.causalclustering.core.state.snapshot.PersistentSnapshotDownloader.OPERATION_NAME;


public class CoreStateDownloaderServiceTest public class CoreStateDownloaderServiceTest
{ {
Expand Down Expand Up @@ -78,18 +82,7 @@ public void shouldRunPersistentDownloader() throws Exception
LeaderLocator leaderLocator = mock( LeaderLocator.class ); LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( someMember ); when( leaderLocator.getLeader() ).thenReturn( someMember );
coreStateDownloaderService.scheduleDownload( leaderLocator ); coreStateDownloaderService.scheduleDownload( leaderLocator );
Predicates.await( () -> waitForApplierToResume( applicationProcess );
{
try
{
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
return true;
}
catch ( Throwable t )
{
return false;
}
}, 1, TimeUnit.SECONDS );


verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
Expand All @@ -99,7 +92,7 @@ public void shouldRunPersistentDownloader() throws Exception
@Test @Test
public void shouldOnlyScheduleOnePersistentDownloaderTaskAtTheTime() throws Exception public void shouldOnlyScheduleOnePersistentDownloaderTaskAtTheTime() throws Exception
{ {
AtomicInteger schedules = new AtomicInteger( ); AtomicInteger schedules = new AtomicInteger();
CountingJobScheduler countingJobScheduler = new CountingJobScheduler( schedules, neo4jJobScheduler ); CountingJobScheduler countingJobScheduler = new CountingJobScheduler( schedules, neo4jJobScheduler );
CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class );
final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class );
Expand All @@ -119,7 +112,71 @@ public void shouldOnlyScheduleOnePersistentDownloaderTaskAtTheTime() throws Exce


availableLeader.set( true ); availableLeader.set( true );


assertEquals(1, schedules.get()); assertEquals( 1, schedules.get() );
}

@Test
public void shouldRunConcurrentDownloads() throws Throwable
{
// given
AtomicInteger schedules = new AtomicInteger();
CountingJobScheduler countingJobScheduler = new CountingJobScheduler( schedules, neo4jJobScheduler );
CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class );
doThrow( StoreCopyFailedException.class ).when( coreStateDownloader ).downloadSnapshot( someMember );
final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class );

final Log log = mock( Log.class );
CoreStateDownloaderService coreStateDownloaderService =
new CoreStateDownloaderService( countingJobScheduler, coreStateDownloader, applicationProcess,
logProvider( log ) );

LeaderLocator leaderLocator = mock( LeaderLocator.class );
when( leaderLocator.getLeader() ).thenReturn( someMember );

// when
coreStateDownloaderService.scheduleDownload( leaderLocator );
Predicates.await( () ->
{
try
{
verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME );
return true;
}
catch ( Throwable t )
{
return false;
}
}, 1, TimeUnit.SECONDS );
coreStateDownloaderService.stop();

// then
waitForApplierToResume( applicationProcess );

// given
doNothing().when( coreStateDownloader ).downloadSnapshot( someMember );

// when
coreStateDownloaderService.scheduleDownload( leaderLocator );
waitForApplierToResume( applicationProcess );

//then
assertEquals( 2, schedules.get() );
}

private void waitForApplierToResume( CommandApplicationProcess applicationProcess ) throws TimeoutException
{
Predicates.await( () ->
{
try
{
verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME );
return true;
}
catch ( Throwable t )
{
return false;
}
}, 1, TimeUnit.SECONDS );
} }


private class ControllableLeaderLocator implements LeaderLocator private class ControllableLeaderLocator implements LeaderLocator
Expand Down

0 comments on commit 77f78a4

Please sign in to comment.