From 77f78a4644c03df0f3644b2cb79b7833c082cb61 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Tue, 5 Dec 2017 11:57:33 +0100 Subject: [PATCH] Respond to feedback from comments 1. Application process always resume. 2. Made CoreStateDownloaderService a LifecycleApplication 3. Added STOPPED state in PersistentSnapshotDownloader. This is called when LifecycleAdaptor is stopped. 4. Do state check before running. Each PersistentSnapshotDownloader has run one time only logic. --- .../neo4j/kernel/impl/util/JobScheduler.java | 5 + .../core/server/CoreServerModule.java | 1 + .../snapshot/CoreStateDownloaderService.java | 30 ++-- .../PersistentSnapshotDownloader.java | 62 ++++++-- .../CoreStateDownloaderServiceTest.java | 87 +++++++++-- .../PersistentSnapshotDownloaderTest.java | 138 +++++++++++++----- 6 files changed, 243 insertions(+), 80 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java index 04260edb83b8e..81cc0e4f034a5 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/JobScheduler.java @@ -157,6 +157,11 @@ class Groups */ public static final Group metricsEvent = new Group( "MetricsEvent", POOLED ); + /** + * Snapshot downloader + */ + public static final Group downloadSnapshot = new JobScheduler.Group( "DownloadSnapshot", POOLED ); + /** * UDC timed events. */ diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index d317503e9550c..31f962d1f6c1c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -236,5 +236,6 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data life.add( raftServer ); // must start before core state so that it can trigger snapshot downloads when necessary life.add( coreLife ); life.add( catchupServer ); // must start last and stop first, since it handles external requests + life.add( downloadService ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java index 3edb1f2b57c72..515085f3e2d36 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java @@ -22,21 +22,19 @@ import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.kernel.impl.util.JobScheduler; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; +import static org.neo4j.kernel.impl.util.JobScheduler.Groups.downloadSnapshot; -public class CoreStateDownloaderService +public class CoreStateDownloaderService extends LifecycleAdapter { - static final String OPERATION_NAME = "download of snapshot"; - private final JobScheduler jobScheduler; private final CoreStateDownloader downloader; private final CommandApplicationProcess applicationProcess; private final Log log; private PersistentSnapshotDownloader currentJob = null; - private final JobScheduler.Group downloadSnapshotGroup; public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader, CommandApplicationProcess applicationProcess, @@ -46,21 +44,23 @@ public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloade this.downloader = downloader; this.applicationProcess = applicationProcess; this.log = logProvider.getLog( getClass() ); - this.downloadSnapshotGroup = new JobScheduler.Group( "download snapshot", POOLED ); } - public void scheduleDownload( LeaderLocator leaderLocator ) + public synchronized void scheduleDownload( LeaderLocator leaderLocator ) { if ( currentJob == null || currentJob.hasCompleted() ) { - synchronized ( this ) - { - if ( currentJob == null || currentJob.hasCompleted() ) - { - currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log ); - jobScheduler.schedule( downloadSnapshotGroup, currentJob ); - } - } + currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log ); + jobScheduler.schedule( downloadSnapshot, currentJob ); + } + } + + @Override + public void stop() throws Throwable + { + if (currentJob != null) + { + currentJob.stop(); } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java index ce7047e99b2a9..9ac6fb523cadd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloader.java @@ -20,7 +20,6 @@ package org.neo4j.causalclustering.core.state.snapshot; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; import org.neo4j.causalclustering.core.consensus.LeaderLocator; @@ -32,6 +31,8 @@ class PersistentSnapshotDownloader implements Runnable { + static final String OPERATION_NAME = "download of snapshot"; + private final CommandApplicationProcess applicationProcess; private final LeaderLocator leaderLocator; private final CoreStateDownloader downloader; @@ -62,26 +63,26 @@ private enum State { INITIATED, RUNNING, + STOPPED, COMPLETED } @Override public void run() { - state = State.RUNNING; + if ( !initialStateOk() ) + { + return; + } try { - applicationProcess.pauseApplier( CoreStateDownloaderService.OPERATION_NAME ); - while ( true ) + state = State.RUNNING; + applicationProcess.pauseApplier( OPERATION_NAME ); + while ( state == State.RUNNING ) { - if ( Thread.interrupted() ) - { - break; - } try { downloader.downloadSnapshot( leaderLocator.getLeader() ); - applicationProcess.resumeApplier( CoreStateDownloaderService.OPERATION_NAME ); break; } catch ( StoreCopyFailedException e ) @@ -92,11 +93,52 @@ public void run() { log.warn( "No leader found. Retrying in {} ms.", timeout.getMillis() ); } - LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( timeout.getMillis() ) ); + Thread.sleep( timeout.getMillis() ); timeout.increment(); } } + catch ( InterruptedException e ) + { + log.error( "Persistent snapshot downloader was interrupted" ); + } finally + { + applicationProcess.resumeApplier( OPERATION_NAME ); + state = State.COMPLETED; + } + } + + private boolean initialStateOk() + { + switch ( state ) + { + case INITIATED: + return true; + case RUNNING: + log.error( "Persistent snapshot downloader is already running. " + + "Illegal state '{}'. Expected '{}'", state, State.INITIATED ); + return false; + case STOPPED: + log.info( "Persistent snapshot downloader was stopped before starting" ); + return false; + case COMPLETED: + log.error( "Persistent snapshot downloader has already completed. " + + "Illegal state '{}'. Expected '{}'", state, State.INITIATED ); + return false; + default: + log.error( "Not a recognised state. " + + "Illegal state '{}'. Expected '{}'", state, State.INITIATED ); + return false; + } + } + + void stop() + { + if ( state == State.RUNNING ) + { + state = State.STOPPED; + } + else if ( state == State.INITIATED ) { state = State.COMPLETED; } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java index 4d6d849e02371..5e49353b8442f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderServiceTest.java @@ -25,9 +25,11 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.state.CommandApplicationProcess; @@ -41,11 +43,13 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; -import static org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService.OPERATION_NAME; +import static org.neo4j.causalclustering.core.state.snapshot.PersistentSnapshotDownloader.OPERATION_NAME; public class CoreStateDownloaderServiceTest { @@ -78,18 +82,7 @@ public void shouldRunPersistentDownloader() throws Exception LeaderLocator leaderLocator = mock( LeaderLocator.class ); when( leaderLocator.getLeader() ).thenReturn( someMember ); coreStateDownloaderService.scheduleDownload( leaderLocator ); - Predicates.await( () -> - { - try - { - verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); - return true; - } - catch ( Throwable t ) - { - return false; - } - }, 1, TimeUnit.SECONDS ); + waitForApplierToResume( applicationProcess ); verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); @@ -99,7 +92,7 @@ public void shouldRunPersistentDownloader() throws Exception @Test public void shouldOnlyScheduleOnePersistentDownloaderTaskAtTheTime() throws Exception { - AtomicInteger schedules = new AtomicInteger( ); + AtomicInteger schedules = new AtomicInteger(); CountingJobScheduler countingJobScheduler = new CountingJobScheduler( schedules, neo4jJobScheduler ); CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); @@ -119,7 +112,71 @@ public void shouldOnlyScheduleOnePersistentDownloaderTaskAtTheTime() throws Exce availableLeader.set( true ); - assertEquals(1, schedules.get()); + assertEquals( 1, schedules.get() ); + } + + @Test + public void shouldRunConcurrentDownloads() throws Throwable + { + // given + AtomicInteger schedules = new AtomicInteger(); + CountingJobScheduler countingJobScheduler = new CountingJobScheduler( schedules, neo4jJobScheduler ); + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + doThrow( StoreCopyFailedException.class ).when( coreStateDownloader ).downloadSnapshot( someMember ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + + final Log log = mock( Log.class ); + CoreStateDownloaderService coreStateDownloaderService = + new CoreStateDownloaderService( countingJobScheduler, coreStateDownloader, applicationProcess, + logProvider( log ) ); + + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenReturn( someMember ); + + // when + coreStateDownloaderService.scheduleDownload( leaderLocator ); + Predicates.await( () -> + { + try + { + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + return true; + } + catch ( Throwable t ) + { + return false; + } + }, 1, TimeUnit.SECONDS ); + coreStateDownloaderService.stop(); + + // then + waitForApplierToResume( applicationProcess ); + + // given + doNothing().when( coreStateDownloader ).downloadSnapshot( someMember ); + + // when + coreStateDownloaderService.scheduleDownload( leaderLocator ); + waitForApplierToResume( applicationProcess ); + + //then + assertEquals( 2, schedules.get() ); + } + + private void waitForApplierToResume( CommandApplicationProcess applicationProcess ) throws TimeoutException + { + Predicates.await( () -> + { + try + { + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + return true; + } + catch ( Throwable t ) + { + return false; + } + }, 1, TimeUnit.SECONDS ); } private class ControllableLeaderLocator implements LeaderLocator diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java index 3c2e3f91edccc..72341c24660c9 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/PersistentSnapshotDownloaderTest.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException; import org.neo4j.causalclustering.core.consensus.LeaderLocator; @@ -38,16 +39,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.startsWith; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; -import static org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService.OPERATION_NAME; +import static org.neo4j.causalclustering.core.state.snapshot.PersistentSnapshotDownloader.OPERATION_NAME; public class PersistentSnapshotDownloaderTest { @@ -76,7 +75,7 @@ public void shouldPauseAndResumeApplicationProcessIfDownloadIsSuccessful() throw } @Test - public void shouldNotResumeCommandApplicationProcessWhileDownloadIsFailing() throws Exception + public void shouldResumeCommandApplicationProcessIsInterruptedDownloadIsFailing() throws Exception { // given CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); @@ -86,26 +85,15 @@ public void shouldNotResumeCommandApplicationProcessWhileDownloadIsFailing() thr when( leaderLocator.getLeader() ).thenReturn( someMember ); final Log log = mock( Log.class ); + NoTimeout timeout = new NoTimeout(); PersistentSnapshotDownloader persistentSnapshotDownloader = - new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log, + timeout ); // when Thread thread = new Thread( persistentSnapshotDownloader ); thread.start(); - - Predicates.await( () -> - { - try - { - verify( log, atLeast( 1 ) ).error( startsWith( "Failed to download snapshot. Retrying in" ) - , anyInt(), any( StoreCopyFailedException.class ) ); - return true; - } - catch ( Throwable throwable ) - { - return false; - } - }, 1, TimeUnit.SECONDS ); + awaitOneIteration( timeout ); // then assertTrue( persistentSnapshotDownloader.isRunning() ); @@ -115,11 +103,11 @@ public void shouldNotResumeCommandApplicationProcessWhileDownloadIsFailing() thr // then verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); - verify( applicationProcess, never() ).resumeApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); } @Test - public void shouldNotResumeCommandApplicationProcessIfNoLeaderIsFound() throws Exception + public void shouldResumeCommandApplicationProcessIfDownloadIsStopped() throws Exception { // given CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); @@ -128,37 +116,27 @@ public void shouldNotResumeCommandApplicationProcessIfNoLeaderIsFound() throws E doThrow( NoLeaderFoundException.class ).when( leaderLocator ).getLeader(); final Log log = mock( Log.class ); + NoTimeout timeout = new + NoTimeout(); PersistentSnapshotDownloader persistentSnapshotDownloader = - new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log, + timeout ); // when Thread thread = new Thread( persistentSnapshotDownloader ); thread.start(); - - Predicates.await( () -> - { - try - { - verify( log, atLeast( 1 ) ).warn( - startsWith( "No leader found. Retrying in" ), - anyInt() ); - return true; - } - catch ( Throwable throwable ) - { - return false; - } - }, 1, TimeUnit.SECONDS ); + awaitOneIteration( timeout ); // then assertTrue( persistentSnapshotDownloader.isRunning() ); // when - thread.stop(); + persistentSnapshotDownloader.stop(); + thread.join(); // then verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); - verify( applicationProcess, never() ).resumeApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); } @Test @@ -183,7 +161,87 @@ public void shouldEventuallySucceed() throws Exception verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); assertEquals( 3, timeout.increments ); - assertFalse(persistentSnapshotDownloader.isRunning()); + assertFalse( persistentSnapshotDownloader.isRunning() ); + } + + @Test + public void shouldNotStartIfAlreadyCompleted() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + when( leaderLocator.getLeader() ).thenReturn( someMember ); + + final Log log = mock( Log.class ); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + + persistentSnapshotDownloader.run(); + persistentSnapshotDownloader.run(); + + verify( log, times( 1 ) ) + .error( startsWith( "Persistent snapshot downloader has already completed." ), any(), any() ); + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + } + + @Test + public void shouldNotStartIfCurrentlyRunning() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + doThrow( NoLeaderFoundException.class ).when( leaderLocator ).getLeader(); + + final Log log = mock( Log.class ); + NoTimeout timeout = new NoTimeout(); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log, + timeout ); + + Thread thread = new Thread( persistentSnapshotDownloader ); + + // when + thread.start(); + awaitOneIteration( timeout ); + persistentSnapshotDownloader.run(); + persistentSnapshotDownloader.stop(); + thread.join(); + + verify( log, times( 1 ) ) + .error( startsWith( "Persistent snapshot downloader is already running." ), any(), any() ); + verify( applicationProcess, times( 1 ) ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, times( 1 ) ).resumeApplier( OPERATION_NAME ); + } + + @Test + public void shouldNotStartIfStoppedBeforeRunning() throws Exception + { + // given + CoreStateDownloader coreStateDownloader = mock( CoreStateDownloader.class ); + final CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); + LeaderLocator leaderLocator = mock( LeaderLocator.class ); + doThrow( NoLeaderFoundException.class ).when( leaderLocator ).getLeader(); + + final Log log = mock( Log.class ); + PersistentSnapshotDownloader persistentSnapshotDownloader = + new PersistentSnapshotDownloader( leaderLocator, applicationProcess, coreStateDownloader, log ); + + // when + persistentSnapshotDownloader.stop(); + persistentSnapshotDownloader.run(); + + verify( log, times( 1 ) ) + .error( startsWith( "Persistent snapshot downloader has already completed.") , any(), any() ); + verify( applicationProcess, never() ).pauseApplier( OPERATION_NAME ); + verify( applicationProcess, never() ).resumeApplier( OPERATION_NAME ); + } + + private void awaitOneIteration( NoTimeout timeout ) throws TimeoutException + { + Predicates.await( () -> timeout.increments > 0, 1, TimeUnit.SECONDS ); } private class EventuallySuccessfulDownloader extends CoreStateDownloader