Skip to content

Commit

Permalink
Core state downloader will now retry indefinitely
Browse files Browse the repository at this point in the history
When the raft message handler need a new snapshot it will schedule
this on the downloader service. This will pause the application process
until download is successful.
  • Loading branch information
RagnarW committed Nov 30, 2017
1 parent 97ff97a commit 719efaf
Show file tree
Hide file tree
Showing 8 changed files with 596 additions and 41 deletions.
Expand Up @@ -55,6 +55,7 @@
import org.neo4j.causalclustering.core.state.RaftMessageHandler;
import org.neo4j.causalclustering.core.state.machines.CoreStateMachinesModule;
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloader;
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.core.state.storage.StateStorage;
import org.neo4j.causalclustering.discovery.TopologyService;
Expand Down Expand Up @@ -177,10 +178,13 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, servicesToStopOnStoreCopy,
remoteStore, catchUpClient, logProvider, storeCopyProcess, coreStateMachinesModule.coreStateMachines,
snapshotService, commandApplicationProcess, topologyService );
snapshotService, topologyService );

CoreStateDownloaderService downloadService = new CoreStateDownloaderService( platformModule
.jobScheduler, downloader, commandApplicationProcess, logProvider );

RaftMessageHandler messageHandler = new RaftMessageHandler( localDatabase, logProvider,
consensusModule.raftMachine(), downloader, commandApplicationProcess );
consensusModule.raftMachine(), downloadService, commandApplicationProcess );

int queueSize = config.get( CausalClusteringSettings.raft_in_queue_size );
int maxBatch = config.get( CausalClusteringSettings.raft_in_queue_max_batch );
Expand Down
Expand Up @@ -22,13 +22,11 @@
import java.util.concurrent.TimeoutException;

import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome;
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloader;
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand All @@ -38,19 +36,19 @@ public class RaftMessageHandler implements Inbound.MessageHandler<RaftMessages.C
private final LocalDatabase localDatabase;
private final Log log;
private final RaftMachine raftMachine;
private final CoreStateDownloader downloader;
private final CoreStateDownloaderService downloadService;
private final CommandApplicationProcess applicationProcess;

private ClusterId boundClusterId;

public RaftMessageHandler( LocalDatabase localDatabase, LogProvider logProvider,
RaftMachine raftMachine, CoreStateDownloader downloader,
RaftMachine raftMachine, CoreStateDownloaderService downloadService,
CommandApplicationProcess applicationProcess )
{
this.localDatabase = localDatabase;
this.log = logProvider.getLog( getClass() );
this.raftMachine = raftMachine;
this.downloader = downloader;
this.downloadService = downloadService;
this.applicationProcess = applicationProcess;
}

Expand All @@ -69,7 +67,7 @@ public synchronized void handle( RaftMessages.ClusterIdAwareMessage clusterIdAwa
ConsensusOutcome outcome = raftMachine.handle( clusterIdAwareMessage.message() );
if ( outcome.needsFreshSnapshot() )
{
downloadSnapshot( clusterIdAwareMessage.message().from() );
downloadService.scheduleDownload( raftMachine );
}
else
{
Expand Down Expand Up @@ -104,21 +102,4 @@ private void notifyCommitted( long commitIndex )
{
applicationProcess.notifyCommitted( commitIndex );
}

/**
* Attempts to download a fresh snapshot from another core instance.
*
* @param source The source address to attempt a download of a snapshot from.
*/
private void downloadSnapshot( MemberId source ) throws Throwable
{
try
{
downloader.downloadSnapshot( source );
}
catch ( StoreCopyFailedException e )
{
log.error( "Failed to download snapshot", e );
}
}
}
Expand Up @@ -28,7 +28,6 @@
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.causalclustering.core.state.CoreSnapshotService;
import org.neo4j.causalclustering.core.state.machines.CoreStateMachines;
import org.neo4j.causalclustering.discovery.TopologyService;
Expand All @@ -45,8 +44,6 @@

public class CoreStateDownloader
{
private static final String OPERATION_NAME = "download of snapshot";

private final LocalDatabase localDatabase;
private final Lifecycle startStopOnStoreCopy;
private final RemoteStore remoteStore;
Expand All @@ -55,14 +52,12 @@ public class CoreStateDownloader
private final StoreCopyProcess storeCopyProcess;
private final CoreStateMachines coreStateMachines;
private final CoreSnapshotService snapshotService;
private final CommandApplicationProcess applicationProcess;
private final TopologyService topologyService;

public CoreStateDownloader( LocalDatabase localDatabase, Lifecycle startStopOnStoreCopy,
RemoteStore remoteStore, CatchUpClient catchUpClient, LogProvider logProvider,
StoreCopyProcess storeCopyProcess, CoreStateMachines coreStateMachines,
CoreSnapshotService snapshotService, CommandApplicationProcess applicationProcess,
TopologyService topologyService )
CoreSnapshotService snapshotService, TopologyService topologyService )
{
this.localDatabase = localDatabase;
this.startStopOnStoreCopy = startStopOnStoreCopy;
Expand All @@ -72,13 +67,11 @@ public CoreStateDownloader( LocalDatabase localDatabase, Lifecycle startStopOnSt
this.storeCopyProcess = storeCopyProcess;
this.coreStateMachines = coreStateMachines;
this.snapshotService = snapshotService;
this.applicationProcess = applicationProcess;
this.topologyService = topologyService;
}

public void downloadSnapshot( MemberId source ) throws StoreCopyFailedException
void downloadSnapshot( MemberId source ) throws StoreCopyFailedException
{
applicationProcess.pauseApplier( OPERATION_NAME );
try
{
/* Extract some key properties before shutting it down. */
Expand Down Expand Up @@ -162,9 +155,5 @@ else if ( catchupResult != SUCCESS_END_OF_STREAM )
{
throw new StoreCopyFailedException( e );
}
finally
{
applicationProcess.resumeApplier( OPERATION_NAME );
}
}
}
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.snapshot;

import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

public class CoreStateDownloaderService
{
static final String OPERATION_NAME = "download of snapshot";

private final JobScheduler jobScheduler;
private final CoreStateDownloader downloader;
private final CommandApplicationProcess applicationProcess;
private final Log log;
private PersistentSnapshotDownloader currentJob = null;
private final JobScheduler.Group downloadSnapshotGroup;

public CoreStateDownloaderService( JobScheduler jobScheduler, CoreStateDownloader downloader,
CommandApplicationProcess applicationProcess,
LogProvider logProvider )
{
this.jobScheduler = jobScheduler;
this.downloader = downloader;
this.applicationProcess = applicationProcess;
this.log = logProvider.getLog( getClass() );
this.downloadSnapshotGroup = new JobScheduler.Group( "download snapshot", POOLED );
}

public void scheduleDownload( LeaderLocator leaderLocator )
{
if ( currentJob == null || currentJob.hasCompleted() )
{
synchronized ( this )
{
if ( currentJob == null || currentJob.hasCompleted() )
{
currentJob = new PersistentSnapshotDownloader( leaderLocator, applicationProcess, downloader, log );
jobScheduler.schedule( downloadSnapshotGroup, currentJob );
}
}
}
}
}
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.snapshot;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.state.CommandApplicationProcess;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.logging.Log;

class PersistentSnapshotDownloader implements Runnable
{
private final CommandApplicationProcess applicationProcess;
private final LeaderLocator leaderLocator;
private final CoreStateDownloader downloader;
private final Log log;
private final TimeoutStrategy.Timeout timeout;
private State state;

PersistentSnapshotDownloader( LeaderLocator leaderLocator,
CommandApplicationProcess applicationProcess, CoreStateDownloader downloader, Log log,
TimeoutStrategy.Timeout timeout )
{
this.applicationProcess = applicationProcess;
this.leaderLocator = leaderLocator;
this.downloader = downloader;
this.log = log;
this.timeout = timeout;
this.state = State.INITIATED;
}

PersistentSnapshotDownloader( LeaderLocator leaderLocator,
CommandApplicationProcess applicationProcess, CoreStateDownloader downloader, Log log )
{
this( leaderLocator, applicationProcess, downloader, log,
new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS ).newTimeout() );
}

private enum State
{
INITIATED,
RUNNING,
COMPLETED
}

@Override
public void run()
{
state = State.RUNNING;
try
{
applicationProcess.pauseApplier( CoreStateDownloaderService.OPERATION_NAME );
while ( true )
{
if ( Thread.interrupted() )
{
break;
}
try
{
downloader.downloadSnapshot( leaderLocator.getLeader() );
applicationProcess.resumeApplier( CoreStateDownloaderService.OPERATION_NAME );
break;
}
catch ( StoreCopyFailedException e )
{
log.error( "Failed to download snapshot. Retrying in {} ms.", timeout.getMillis(), e );
}
catch ( NoLeaderFoundException e )
{
log.warn( "No leader found. Retrying in {} ms.", timeout.getMillis() );
}
LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( timeout.getMillis() ) );
timeout.increment();
}
}
finally
{
state = State.COMPLETED;
}
}

boolean isRunning()
{
return state == State.RUNNING;
}

boolean hasCompleted()
{
return state == State.COMPLETED;
}

}

0 comments on commit 719efaf

Please sign in to comment.