Skip to content

Commit

Permalink
improve error reporting and exception handling in read replica startup
Browse files Browse the repository at this point in the history
Apart from internal code structure, this improves troubleshooting for
a user where a clean reason for a failed startup is visible in neo4j.log
and thus easily actionable.
  • Loading branch information
martinfurmanski committed Jan 11, 2017
1 parent 2b1b5bf commit 8d82196
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 33 deletions.
Expand Up @@ -32,8 +32,8 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
Expand Down Expand Up @@ -251,7 +251,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

life.add( new ReadReplicaStartupProcess( platformModule.fileSystem, storeFetcher, localDatabase, txPulling,
new ConnectToRandomCoreMember( discoveryService ),
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider, copiedStoreRecovery ) );
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider,
platformModule.logging.getUserLogProvider(), copiedStoreRecovery ) );

dependencies.satisfyDependency( createSessionTracker() );
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.causalclustering.readreplica;

import java.io.IOException;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
Expand All @@ -41,18 +42,23 @@

class ReadReplicaStartupProcess implements Lifecycle
{
private static final int MAX_ATTEMPTS = 5;

private final FileSystemAbstraction fs;
private final StoreFetcher storeFetcher;
private final LocalDatabase localDatabase;
private final Lifecycle txPulling;
private final CoreMemberSelectionStrategy connectionStrategy;
private final Log log;
private final Log debugLog;
private final Log userLog;
private final RetryStrategy.Timeout timeout;

private final CopiedStoreRecovery copiedStoreRecovery;
private String lastIssue;

ReadReplicaStartupProcess( FileSystemAbstraction fs, StoreFetcher storeFetcher, LocalDatabase localDatabase,
Lifecycle txPulling, CoreMemberSelectionStrategy connectionStrategy, RetryStrategy retryStrategy,
LogProvider logProvider, CopiedStoreRecovery copiedStoreRecovery )
LogProvider debugLogProvider, LogProvider userLogProvider, CopiedStoreRecovery copiedStoreRecovery )
{
this.fs = fs;
this.storeFetcher = storeFetcher;
Expand All @@ -61,7 +67,8 @@ class ReadReplicaStartupProcess implements Lifecycle
this.connectionStrategy = connectionStrategy;
this.copiedStoreRecovery = copiedStoreRecovery;
this.timeout = retryStrategy.newTimeout();
this.log = logProvider.getLog( getClass() );
this.debugLog = debugLogProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
}

@Override
Expand All @@ -71,33 +78,38 @@ public void init() throws Throwable
txPulling.init();
}

private String issueOf( String operation, int attempt )
{
return format( "Failed attempt %d of %s", attempt, operation );
}

@Override
public void start() throws Throwable
public void start() throws IOException
{
long retryInterval = 5_000;
int attempts = 0;
while ( attempts++ < 5 )
boolean syncedWithCore = false;
for ( int attempt = 1; attempt <= MAX_ATTEMPTS && !syncedWithCore; attempt++ )
{
MemberId source = findCoreMemberToCopyFrom();
try
{
tryToStart( source );
return;
syncStoreWithCore( source );
syncedWithCore = true;
}
catch ( StoreCopyFailedException e )
{
log.info( "Attempt #%d to start read replica failed while copying store files from %s.",
attempts,
source );
lastIssue = issueOf( format( "copying store files from %s", source ), attempt );
debugLog.warn( lastIssue );
}
catch ( StreamingTransactionsFailedException e )
{
log.info( "Attempt #%d to start read replica failed while streaming transactions from %s.", attempts,
source );
lastIssue = issueOf( format( "streaming transactions from %s", source ), attempt );
debugLog.warn( lastIssue );
}
catch ( StoreIdDownloadFailedException e )
{
log.info( "Attempt #%d to start read replica failed while getting store id from %s.", attempts, source );
lastIssue = issueOf( format( "getting store id from %s", source ), attempt );
debugLog.warn( lastIssue );
}

try
Expand All @@ -108,35 +120,52 @@ public void start() throws Throwable
catch ( InterruptedException e )
{
Thread.interrupted();
throw new RuntimeException( "Interrupted while trying to start read replica.", e );
lastIssue = "Interrupted while trying to start read replica";
debugLog.warn( lastIssue );
break;
}

attempt++;
}

if ( !syncedWithCore )
{
userLog.error( lastIssue );
throw new RuntimeException( lastIssue );
}

try
{
localDatabase.start();
txPulling.start();
}
catch ( Throwable e )
{
throw new RuntimeException( e );
}
throw new Exception( "Failed to start read replica after " + (attempts - 1) + " attempts" );
}

private void tryToStart( MemberId source ) throws Throwable
private void syncStoreWithCore( MemberId source ) throws IOException, StoreIdDownloadFailedException,
StoreCopyFailedException, StreamingTransactionsFailedException
{
if ( localDatabase.isEmpty() )
{
log.info( "Local database is empty, attempting to replace with copy from core server %s", source );
debugLog.info( "Local database is empty, attempting to replace with copy from core server %s", source );

log.info( "Finding store id of core server %s", source );
debugLog.info( "Finding store id of core server %s", source );
StoreId storeId = storeFetcher.getStoreIdOf( source );

log.info( "Copying store from core server %s", source );
debugLog.info( "Copying store from core server %s", source );
localDatabase.delete();
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log )
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, debugLog )
.copyWholeStoreFrom( source, storeId, storeFetcher );

log.info( "Restarting local database after copy.", source );
debugLog.info( "Restarting local database after copy.", source );
}
else
{
ensureSameStoreIdAs( source );
}

localDatabase.start();
txPulling.start();
}

private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFailedException
Expand All @@ -158,12 +187,12 @@ private MemberId findCoreMemberToCopyFrom()
try
{
MemberId memberId = connectionStrategy.coreMember();
log.info( "Server starting, connecting to core server %s", memberId );
debugLog.info( "Server starting, connecting to core server %s", memberId );
return memberId;
}
catch ( CoreMemberSelectionException ex )
{
log.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() );
debugLog.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() );
LockSupport.parkUntil( timeout.getMillis() + System.currentTimeMillis() );
timeout.increment();
}
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void shouldReplaceEmptyStoreWithRemote() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
readReplicaStartupProcess.start();
Expand All @@ -105,7 +105,7 @@ public void shouldNotStartWithMismatchedNonEmptyStore() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
try
Expand Down Expand Up @@ -135,7 +135,7 @@ public void shouldStartWithMatchingDatabase() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
readReplicaStartupProcess.start();
Expand All @@ -155,7 +155,7 @@ public void stopShouldStopTheDatabaseAndStopPolling() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );
readReplicaStartupProcess.start();

// when
Expand Down

0 comments on commit 8d82196

Please sign in to comment.