diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java index ffbb05a8cee22..13609da88d23c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java @@ -19,6 +19,8 @@ */ package org.neo4j.causalclustering.core.state; +import java.util.Optional; + import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.RaftMessages; @@ -26,6 +28,7 @@ import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService; import org.neo4j.causalclustering.identity.ClusterId; import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; +import org.neo4j.kernel.impl.util.JobScheduler.JobHandle; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -56,7 +59,11 @@ public synchronized void handle( RaftMessages.ReceivedInstantClusterIdAwareMessa ConsensusOutcome outcome = raftMachine.handle( wrappedMessage.message() ); if ( outcome.needsFreshSnapshot() ) { - downloadService.scheduleDownload( raftMachine ); + Optional downloadJob = downloadService.scheduleDownload( raftMachine ); + if ( downloadJob.isPresent() ) + { + downloadJob.get().waitTermination(); + } } else { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java index ad6fc04b5cd5a..47a150548a04f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java @@ -20,9 +20,12 @@ package org.neo4j.causalclustering.core.state.snapshot; import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import java.util.Optional; + import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.kernel.impl.util.JobScheduler; +import org.neo4j.kernel.impl.util.JobScheduler.JobHandle; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -37,6 +40,7 @@ public class CoreStateDownloaderService extends LifecycleAdapter private final Log log; private final TimeoutStrategy.Timeout downloaderPauseStrategy; private PersistentSnapshotDownloader currentJob; + private JobHandle jobHandle; private boolean stopped; public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader, @@ -51,19 +55,21 @@ public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloade this.downloaderPauseStrategy = downloaderPauseStrategy; } - public synchronized void scheduleDownload( LeaderLocator leaderLocator ) + public synchronized Optional scheduleDownload( LeaderLocator leaderLocator ) { if ( stopped ) { - return; + return Optional.empty(); } if ( currentJob == null || currentJob.hasCompleted() ) { currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log, downloaderPauseStrategy ); - jobScheduler.schedule( downloadSnapshot, currentJob ); + jobHandle = jobScheduler.schedule( downloadSnapshot, currentJob ); + return Optional.of( jobHandle ); } + return Optional.of( jobHandle ); } @Override