Skip to content

Commit

Permalink
Cleaner error handling around store copy
Browse files Browse the repository at this point in the history
* Fatal and unknown exceptions lead to a panic.
  Typically these are life-cycle exceptions, I/O exceptions, etc.

* StoreCopyFailedException is now a proper wrapper for benign issues.
  The unnecessary StreamingTransactionsFailed exception was thus removed,
  since it could be subsumed by this in all places. Ideally we would
  get rid of this exception completely.

* The CoreStateDownloader now avoids exceptions for regular control flow,
  and exceptions are just used for exceptional circumstances. Specifically,
  the StoreCopyFailedException is not a valid outcome. Either an exception
  with known failure handling semantics is thrown, or a boolean is returned
  indicating if the operation succeeded or needs to be retried.
  • Loading branch information
martinfurmanski committed Mar 29, 2018
1 parent 5c2ed79 commit f213392
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 220 deletions.
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -53,14 +52,7 @@ class BackupDelegator extends LifecycleAdapter

void copy( AdvertisedSocketAddress fromAddress, StoreId expectedStoreId, Path destDir ) throws StoreCopyFailedException
{
try
{
remoteStore.copy( new CatchupAddressProvider.SingleAddressProvider( fromAddress ), expectedStoreId, destDir.toFile() );
}
catch ( StreamingTransactionsFailedException e )
{
throw new StoreCopyFailedException( e );
}
remoteStore.copy( new CatchupAddressProvider.SingleAddressProvider( fromAddress ), expectedStoreId, destDir.toFile() );
}

CatchupResult tryCatchingUp( AdvertisedSocketAddress fromAddress, StoreId expectedStoreId, Path storeDir ) throws StoreCopyFailedException
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;

Expand Down Expand Up @@ -117,7 +116,7 @@ public void fetchStoreIdDelegatesToStoreCopyClient() throws StoreIdDownloadFaile

@Test
public void retrieveStoreDelegatesToStoreCopyService()
throws StoreCopyFailedException, StreamingTransactionsFailedException, CatchupAddressResolutionException
throws StoreCopyFailedException, CatchupAddressResolutionException
{
// given
StoreId storeId = new StoreId( 92, 5, 7, 32 );
Expand Down
Expand Up @@ -57,11 +57,11 @@ public synchronized void shutdown()
shutdown = true;
}

public synchronized void recoverCopiedStore( File tempStore ) throws StoreCopyFailedException
public synchronized void recoverCopiedStore( File tempStore ) throws DatabaseShutdownException
{
if ( shutdown )
{
throw new StoreCopyFailedException( "Abort store-copied store recovery due to database shutdown" );
throw new DatabaseShutdownException( "Abort store-copied store recovery due to database shutdown" );
}

try
Expand Down
Expand Up @@ -19,9 +19,9 @@
*/
package org.neo4j.causalclustering.catchup.storecopy;

public class StreamingTransactionsFailedException extends Exception
public class DatabaseShutdownException extends Exception
{
StreamingTransactionsFailedException( String message )
DatabaseShutdownException( String message )
{
super( message );
}
Expand Down
Expand Up @@ -119,7 +119,7 @@ public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expect
}

public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreId, File destDir )
throws StoreCopyFailedException, StreamingTransactionsFailedException
throws StoreCopyFailedException
{
try
{
Expand All @@ -133,12 +133,11 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI

// Even for cluster store copy, we still write the transaction logs into the store directory itself
// because the destination directory is temporary. We will copy them to the correct place later.
boolean keepTxLogsInStoreDir = true;
CatchupResult catchupResult =
pullTransactions( addressProvider.primary(), expectedStoreId, destDir, lastFlushedTxId, true, keepTxLogsInStoreDir );
CatchupResult catchupResult = pullTransactions( addressProvider.primary(), expectedStoreId, destDir,
lastFlushedTxId, true, true );
if ( catchupResult != SUCCESS_END_OF_STREAM )
{
throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult );
throw new StoreCopyFailedException( "Failed to pull transactions: " + catchupResult );
}
}
catch ( CatchupAddressResolutionException | IOException e )
Expand Down
Expand Up @@ -49,7 +49,7 @@ public StoreCopyProcess( FileSystemAbstraction fs, PageCache pageCache, LocalDat
}

public void replaceWithStoreFrom( CatchupAddressProvider addressProvider, StoreId expectedStoreId )
throws IOException, StoreCopyFailedException, StreamingTransactionsFailedException
throws IOException, StoreCopyFailedException, DatabaseShutdownException
{
try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, pageCache, localDatabase.storeDir() ) )
{
Expand Down
Expand Up @@ -28,10 +28,10 @@
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchupAddressProvider;
import org.neo4j.causalclustering.catchup.storecopy.DatabaseShutdownException;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.core.consensus.schedule.Timer;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService.TimerName;
Expand Down Expand Up @@ -325,11 +325,16 @@ private void downloadDatabase( StoreId localStoreId )
new CatchupAddressProvider.UpstreamStrategyBoundAddressProvider( topologyService, selectionStrategyPipeline );
storeCopyProcess.replaceWithStoreFrom( upstreamStrategyBoundAddressProvider, localStoreId );
}
catch ( IOException | StoreCopyFailedException | StreamingTransactionsFailedException e )
catch ( IOException | StoreCopyFailedException e )
{
log.warn( "Error copying store. Will retry shortly.", e );
return;
}
catch ( DatabaseShutdownException e )
{
log.warn( "Store copy aborted due to shutdown.", e );
return;
}

try
{
Expand Down
Expand Up @@ -177,7 +177,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient );

this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader, commandApplicationProcess, logProvider,
new ExponentialBackoffStrategy( 1, 30, SECONDS ).newTimeout() );
new ExponentialBackoffStrategy( 1, 30, SECONDS ).newTimeout(), databaseHealthSupplier );

this.membershipWaiterLifecycle = createMembershipWaiterLifecycle();

Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.causalclustering.core.state;

import java.io.IOException;

import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.state.snapshot.CoreSnapshot;
Expand Down Expand Up @@ -63,7 +65,7 @@ public synchronized CoreSnapshot snapshot() throws Exception
}
}

public synchronized void installSnapshot( CoreSnapshot coreSnapshot ) throws Exception
public synchronized void installSnapshot( CoreSnapshot coreSnapshot ) throws IOException
{
long snapshotPrevIndex = coreSnapshot.prevIndex();
raftLog.skip( snapshotPrevIndex, coreSnapshot.prevTerm() );
Expand Down

0 comments on commit f213392

Please sign in to comment.