Skip to content

Commit

Permalink
Raft-synchronous snapshot download
Browse files Browse the repository at this point in the history
Blocks the Raft while downloading the snapshot.
  • Loading branch information
martinfurmanski committed Apr 5, 2018
1 parent a94d72e commit 1171d18
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Expand Up @@ -19,13 +19,16 @@
*/
package org.neo4j.causalclustering.core.state;

import java.util.Optional;

import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome;
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.kernel.impl.util.JobScheduler.JobHandle;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -56,7 +59,11 @@ public synchronized void handle( RaftMessages.ReceivedInstantClusterIdAwareMessa
ConsensusOutcome outcome = raftMachine.handle( wrappedMessage.message() );
if ( outcome.needsFreshSnapshot() )
{
downloadService.scheduleDownload( raftMachine );
Optional<JobHandle> downloadJob = downloadService.scheduleDownload( raftMachine );
if ( downloadJob.isPresent() )
{
downloadJob.get().waitTermination();
}
}
else
{
Expand Down
Expand Up @@ -20,9 +20,12 @@
package org.neo4j.causalclustering.core.state.snapshot;

import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import java.util.Optional;

import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.JobScheduler.JobHandle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand All @@ -37,6 +40,7 @@ public class CoreStateDownloaderService extends LifecycleAdapter
private final Log log;
private final TimeoutStrategy.Timeout downloaderPauseStrategy;
private PersistentSnapshotDownloader currentJob;
private JobHandle jobHandle;
private boolean stopped;

public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader,
Expand All @@ -51,19 +55,21 @@ public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloade
this.downloaderPauseStrategy = downloaderPauseStrategy;
}

public synchronized void scheduleDownload( LeaderLocator leaderLocator )
public synchronized Optional<JobHandle> scheduleDownload( LeaderLocator leaderLocator )
{
if ( stopped )
{
return;
return Optional.empty();
}

if ( currentJob == null || currentJob.hasCompleted() )
{
currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log,
downloaderPauseStrategy );
jobScheduler.schedule( downloadSnapshot, currentJob );
jobHandle = jobScheduler.schedule( downloadSnapshot, currentJob );
return Optional.of( jobHandle );
}
return Optional.of( jobHandle );
}

@Override
Expand Down

0 comments on commit 1171d18

Please sign in to comment.