Skip to content

Commit

Permalink
Raft-synchronous snapshot download
Browse files Browse the repository at this point in the history
Blocks the Raft while downloading the snapshot.
  • Loading branch information
martinfurmanski committed Mar 27, 2018
1 parent d6f6bae commit 865c641
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.causalclustering.core.state;

import java.util.Optional;

import org.neo4j.causalclustering.catchup.CatchupAddressProvider;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.core.consensus.RaftMachine;
Expand All @@ -29,6 +31,7 @@
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;

public class RaftMessageApplier implements LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>>
{
Expand Down Expand Up @@ -58,7 +61,11 @@ public synchronized void handle( RaftMessages.ReceivedInstantClusterIdAwareMessa
ConsensusOutcome outcome = raftMachine.handle( wrappedMessage.message() );
if ( outcome.needsFreshSnapshot() )
{
downloadService.scheduleDownload( catchupAddressProvider );
Optional<JobScheduler.JobHandle> downloadJob = downloadService.scheduleDownload( catchupAddressProvider );
if ( downloadJob.isPresent() )
{
downloadJob.get().waitTermination();
}
}
else
{
Expand Down
Expand Up @@ -19,13 +19,16 @@
*/
package org.neo4j.causalclustering.core.state.snapshot;

import java.util.Optional;

import org.neo4j.causalclustering.catchup.CatchupAddressProvider;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler.JobHandle;

import static org.neo4j.scheduler.JobScheduler.Groups.downloadSnapshot;

Expand All @@ -37,6 +40,7 @@ public class CoreStateDownloaderService extends LifecycleAdapter
private final Log log;
private final TimeoutStrategy.Timeout downloaderPauseStrategy;
private PersistentSnapshotDownloader currentJob;
private JobHandle jobHandle;
private boolean stopped;

public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader,
Expand All @@ -51,19 +55,21 @@ public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloade
this.downloaderPauseStrategy = downloaderPauseStrategy;
}

public synchronized void scheduleDownload( CatchupAddressProvider addressProvider )
public synchronized Optional<JobHandle> scheduleDownload( CatchupAddressProvider addressProvider )
{
if ( stopped )
{
return;
return Optional.empty();
}

if ( currentJob == null || currentJob.hasCompleted() )
{
currentJob = new PersistentSnapshotDownloader( addressProvider, applicationProcess, downloader, log,
downloaderPauseStrategy );
jobScheduler.schedule( downloadSnapshot, currentJob );
jobHandle = jobScheduler.schedule( downloadSnapshot, currentJob );
return Optional.of( jobHandle );
}
return Optional.of( jobHandle );
}

@Override
Expand Down

0 comments on commit 865c641

Please sign in to comment.