From 865c6419c4378bbf392d27b12b26c095c01750ae Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Tue, 27 Mar 2018 20:24:42 +0200 Subject: [PATCH] Raft-synchronous snapshot download Blocks the Raft while downloading the snapshot. --- .../core/state/RaftMessageApplier.java | 9 ++++++++- .../state/snapshot/CoreStateDownloaderService.java | 12 +++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java index 69ea83a55a271..a6cdd4fd84f12 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/RaftMessageApplier.java @@ -19,6 +19,8 @@ */ package org.neo4j.causalclustering.core.state; +import java.util.Optional; + import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.consensus.RaftMachine; @@ -29,6 +31,7 @@ import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler; public class RaftMessageApplier implements LifecycleMessageHandler> { @@ -58,7 +61,11 @@ public synchronized void handle( RaftMessages.ReceivedInstantClusterIdAwareMessa ConsensusOutcome outcome = raftMachine.handle( wrappedMessage.message() ); if ( outcome.needsFreshSnapshot() ) { - downloadService.scheduleDownload( catchupAddressProvider ); + Optional downloadJob = downloadService.scheduleDownload( catchupAddressProvider ); + if ( downloadJob.isPresent() ) + { + downloadJob.get().waitTermination(); + } } else { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java index c64952aab7dfd..923a590c08a16 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderService.java @@ -19,6 +19,8 @@ */ package org.neo4j.causalclustering.core.state.snapshot; +import java.util.Optional; + import org.neo4j.causalclustering.catchup.CatchupAddressProvider; import org.neo4j.causalclustering.core.state.CommandApplicationProcess; import org.neo4j.causalclustering.helper.TimeoutStrategy; @@ -26,6 +28,7 @@ import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler.JobHandle; import static org.neo4j.scheduler.JobScheduler.Groups.downloadSnapshot; @@ -37,6 +40,7 @@ public class CoreStateDownloaderService extends LifecycleAdapter private final Log log; private final TimeoutStrategy.Timeout downloaderPauseStrategy; private PersistentSnapshotDownloader currentJob; + private JobHandle jobHandle; private boolean stopped; public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader, @@ -51,19 +55,21 @@ public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloade this.downloaderPauseStrategy = downloaderPauseStrategy; } - public synchronized void scheduleDownload( CatchupAddressProvider addressProvider ) + public synchronized Optional scheduleDownload( CatchupAddressProvider addressProvider ) { if ( stopped ) { - return; + return Optional.empty(); } if ( currentJob == null || currentJob.hasCompleted() ) { currentJob = new PersistentSnapshotDownloader( addressProvider, applicationProcess, downloader, log, downloaderPauseStrategy ); - jobScheduler.schedule( downloadSnapshot, currentJob ); + jobHandle = jobScheduler.schedule( downloadSnapshot, currentJob ); + return Optional.of( jobHandle ); } + return Optional.of( jobHandle ); } @Override