Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.1' into 3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jan 16, 2017
2 parents 6591e62 + d43c7b3 commit a7ca610
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
progressTracker = new ProgressTrackerImpl( myGlobalSession );

replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool,
progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ), platformModule.availabilityGuard ) );

progressTracker, new ExponentialBackoffStrategy( 10, 60, SECONDS ),
platformModule.availabilityGuard ) );
}

public RaftReplicator getReplicator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@

public class ExponentialBackoffStrategy implements RetryStrategy
{
protected final long initialBackoffTimeMillis;
private final long initialBackoffTimeMillis;
private final long upperBoundBackoffTimeMillis;

public ExponentialBackoffStrategy( long initialBackoffTime, TimeUnit timeUnit )
public ExponentialBackoffStrategy( long initialBackoffTime, long upperBoundBackoffTime, TimeUnit timeUnit )
{
initialBackoffTimeMillis = timeUnit.toMillis( initialBackoffTime );
assert initialBackoffTime <= upperBoundBackoffTime;

this.initialBackoffTimeMillis = timeUnit.toMillis( initialBackoffTime );
this.upperBoundBackoffTimeMillis = timeUnit.toMillis( upperBoundBackoffTime );
}

@Override
Expand All @@ -46,7 +50,7 @@ public long getMillis()
@Override
public void increment()
{
backoffTimeMillis = backoffTimeMillis * 2;
backoffTimeMillis = Math.min( backoffTimeMillis * 2, upperBoundBackoffTimeMillis );
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
Expand Down Expand Up @@ -251,7 +251,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

life.add( new ReadReplicaStartupProcess( platformModule.fileSystem, storeFetcher, localDatabase, txPulling,
new ConnectToRandomCoreMember( discoveryService ),
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider, copiedStoreRecovery ) );
new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS ), logProvider,
platformModule.logging.getUserLogProvider(), copiedStoreRecovery ) );

dependencies.satisfyDependency( createSessionTracker() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.causalclustering.readreplica;

import java.io.IOException;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
Expand All @@ -41,27 +42,33 @@

class ReadReplicaStartupProcess implements Lifecycle
{
private static final int MAX_ATTEMPTS = 5;

private final FileSystemAbstraction fs;
private final StoreFetcher storeFetcher;
private final LocalDatabase localDatabase;
private final Lifecycle txPulling;
private final CoreMemberSelectionStrategy connectionStrategy;
private final Log log;
private final RetryStrategy.Timeout timeout;
private final Log debugLog;
private final Log userLog;

private final RetryStrategy retryStrategy;
private final CopiedStoreRecovery copiedStoreRecovery;
private String lastIssue;

ReadReplicaStartupProcess( FileSystemAbstraction fs, StoreFetcher storeFetcher, LocalDatabase localDatabase,
Lifecycle txPulling, CoreMemberSelectionStrategy connectionStrategy, RetryStrategy retryStrategy,
LogProvider logProvider, CopiedStoreRecovery copiedStoreRecovery )
LogProvider debugLogProvider, LogProvider userLogProvider, CopiedStoreRecovery copiedStoreRecovery )
{
this.fs = fs;
this.storeFetcher = storeFetcher;
this.localDatabase = localDatabase;
this.txPulling = txPulling;
this.connectionStrategy = connectionStrategy;
this.retryStrategy = retryStrategy;
this.copiedStoreRecovery = copiedStoreRecovery;
this.timeout = retryStrategy.newTimeout();
this.log = logProvider.getLog( getClass() );
this.debugLog = debugLogProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
}

@Override
Expand All @@ -71,72 +78,94 @@ public void init() throws Throwable
txPulling.init();
}

private String issueOf( String operation, int attempt )
{
return format( "Failed attempt %d of %s", attempt, operation );
}

@Override
public void start() throws Throwable
public void start() throws IOException
{
long retryInterval = 5_000;
int attempts = 0;
while ( attempts++ < 5 )
boolean syncedWithCore = false;
RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
for ( int attempt = 1; attempt <= MAX_ATTEMPTS && !syncedWithCore; attempt++ )
{
MemberId source = findCoreMemberToCopyFrom();
try
{
tryToStart( source );
return;
syncStoreWithCore( source );
syncedWithCore = true;
}
catch ( StoreCopyFailedException e )
{
log.info( "Attempt #%d to start read replica failed while copying store files from %s.",
attempts,
source );
lastIssue = issueOf( format( "copying store files from %s", source ), attempt );
debugLog.warn( lastIssue );
}
catch ( StreamingTransactionsFailedException e )
{
log.info( "Attempt #%d to start read replica failed while streaming transactions from %s.", attempts,
source );
lastIssue = issueOf( format( "streaming transactions from %s", source ), attempt );
debugLog.warn( lastIssue );
}
catch ( StoreIdDownloadFailedException e )
{
log.info( "Attempt #%d to start read replica failed while getting store id from %s.", attempts, source );
lastIssue = issueOf( format( "getting store id from %s", source ), attempt );
debugLog.warn( lastIssue );
}

try
{
Thread.sleep( retryInterval );
retryInterval = Math.min( 60_000, retryInterval * 2 );
Thread.sleep( timeout.getMillis() );
timeout.increment();
}
catch ( InterruptedException e )
{
Thread.interrupted();
throw new RuntimeException( "Interrupted while trying to start read replica.", e );
lastIssue = "Interrupted while trying to start read replica";
debugLog.warn( lastIssue );
break;
}

attempt++;
}

if ( !syncedWithCore )
{
userLog.error( lastIssue );
throw new RuntimeException( lastIssue );
}

try
{
localDatabase.start();
txPulling.start();
}
catch ( Throwable e )
{
throw new RuntimeException( e );
}
throw new Exception( "Failed to start read replica after " + (attempts - 1) + " attempts" );
}

private void tryToStart( MemberId source ) throws Throwable
private void syncStoreWithCore( MemberId source ) throws IOException, StoreIdDownloadFailedException,
StoreCopyFailedException, StreamingTransactionsFailedException
{
if ( localDatabase.isEmpty() )
{
log.info( "Local database is empty, attempting to replace with copy from core server %s", source );
debugLog.info( "Local database is empty, attempting to replace with copy from core server %s", source );

log.info( "Finding store id of core server %s", source );
debugLog.info( "Finding store id of core server %s", source );
StoreId storeId = storeFetcher.getStoreIdOf( source );

log.info( "Copying store from core server %s", source );
debugLog.info( "Copying store from core server %s", source );
localDatabase.delete();
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, log )
new CopyStoreSafely( fs, localDatabase, copiedStoreRecovery, debugLog )
.copyWholeStoreFrom( source, storeId, storeFetcher );

log.info( "Restarting local database after copy.", source );
debugLog.info( "Restarting local database after copy.", source );
}
else
{
ensureSameStoreIdAs( source );
}

localDatabase.start();
txPulling.start();
}

private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFailedException
Expand All @@ -153,17 +182,18 @@ private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFa

private MemberId findCoreMemberToCopyFrom()
{
RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
while ( true )
{
try
{
MemberId memberId = connectionStrategy.coreMember();
log.info( "Server starting, connecting to core server %s", memberId );
debugLog.info( "Server starting, connecting to core server %s", memberId );
return memberId;
}
catch ( CoreMemberSelectionException ex )
{
log.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() );
debugLog.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() );
LockSupport.parkUntil( timeout.getMillis() + System.currentTimeMillis() );
timeout.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.junit.Assert.assertEquals;

public class ExponentialBackoffStrategyTest
Expand All @@ -33,7 +32,7 @@ public class ExponentialBackoffStrategyTest
public void shouldDoubleEachTime() throws Exception
{
// given
ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, MILLISECONDS );
ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, 1 << NUMBER_OF_ACCESSES, MILLISECONDS );
RetryStrategy.Timeout timeout = strategy.newTimeout();

// when
Expand All @@ -43,23 +42,48 @@ public void shouldDoubleEachTime() throws Exception
}

// then
assertEquals( 2 << NUMBER_OF_ACCESSES - 1, timeout.getMillis() );
assertEquals( 1 << NUMBER_OF_ACCESSES, timeout.getMillis() );
}

@Test
public void shouldProvidePreviousTimeout() throws Exception
{
// given
ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, MILLISECONDS );
ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, 1 << NUMBER_OF_ACCESSES, MILLISECONDS );
RetryStrategy.Timeout timeout = strategy.newTimeout();

// when
for ( int i = 0; i < NUMBER_OF_ACCESSES; i++ )
{
timeout.increment();
}

// then
assertEquals( 1 << NUMBER_OF_ACCESSES, timeout.getMillis() );
}

@Test
public void shouldRespectUpperBound() throws Exception
{
// given
long upperBound = (1 << NUMBER_OF_ACCESSES) - 5;
ExponentialBackoffStrategy strategy = new ExponentialBackoffStrategy( 1, upperBound, MILLISECONDS );
RetryStrategy.Timeout timeout = strategy.newTimeout();

// when
for ( int i = 0; i <= NUMBER_OF_ACCESSES; i++ )
for ( int i = 0; i < NUMBER_OF_ACCESSES; i++ )
{
timeout.increment();
}

assertEquals( upperBound, timeout.getMillis() );

// additional increments
timeout.increment();
timeout.increment();
timeout.increment();

// then
assertEquals( 2 << NUMBER_OF_ACCESSES, timeout.getMillis() );
assertEquals( upperBound, timeout.getMillis() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void shouldReplaceEmptyStoreWithRemote() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
readReplicaStartupProcess.start();
Expand All @@ -105,7 +105,7 @@ public void shouldNotStartWithMismatchedNonEmptyStore() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
try
Expand Down Expand Up @@ -135,7 +135,7 @@ public void shouldStartWithMatchingDatabase() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
readReplicaStartupProcess.start();
Expand All @@ -155,7 +155,7 @@ public void stopShouldStopTheDatabaseAndStopPolling() throws Throwable
ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
NullLogProvider.getInstance(), copiedStoreRecovery );
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );
readReplicaStartupProcess.start();

// when
Expand Down

0 comments on commit a7ca610

Please sign in to comment.