Skip to content

Commit

Permalink
remove timeout for connecting to core
Browse files Browse the repository at this point in the history
It is not clear why it was added to begin with. The simplest solution is just
to let it keep connecting until the core becomes available.
  • Loading branch information
martinfurmanski committed Jan 17, 2017
1 parent 25f6bc4 commit e1369b8
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 91 deletions.
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.GlobalSessionTrackerState;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.state.machines.tx.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
Expand Down Expand Up @@ -67,8 +67,9 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession );
progressTracker = new ProgressTrackerImpl( myGlobalSession );

ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 10, 60, SECONDS );
replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool,
progressTracker, new ExponentialBackoffStrategy( 10, 60, SECONDS ) ) );
progressTracker, retryStrategy ) );
}

public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy;
import org.neo4j.causalclustering.helper.RetryStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand Down

This file was deleted.

Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.machines.tx;
package org.neo4j.causalclustering.helper;

import java.util.concurrent.TimeUnit;

Expand Down
Expand Up @@ -17,10 +17,13 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.machines.tx;
package org.neo4j.causalclustering.helper;

import java.util.concurrent.TimeUnit;

/**
* Exponential backoff strategy helper class.
*/
public class ExponentialBackoffStrategy implements RetryStrategy
{
private final long initialBackoffTimeMillis;
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.machines.tx;
package org.neo4j.causalclustering.helper;

public interface RetryStrategy
{
Expand Down
Expand Up @@ -37,7 +37,7 @@
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.core.state.machines.tx.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.discovery.procedures.ReadReplicaRoleProcedure;
Expand Down Expand Up @@ -249,9 +249,10 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
txPulling.add( catchupTimeoutService );
txPulling.add( new WaitForUpToDateStore( catchupProcess, logProvider ) );

ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS );
life.add( new ReadReplicaStartupProcess( platformModule.fileSystem, storeFetcher, localDatabase, txPulling,
new ConnectToRandomCoreMember( discoveryService ),
new ExponentialBackoffStrategy( 1, 30, TimeUnit.SECONDS ), logProvider,
retryStrategy, logProvider,
platformModule.logging.getUserLogProvider(), copiedStoreRecovery ) );

dependencies.satisfyDependency( createSessionTracker() );
Expand Down
Expand Up @@ -20,15 +20,14 @@
package org.neo4j.causalclustering.readreplica;

import java.io.IOException;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy;
import org.neo4j.causalclustering.helper.RetryStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.routing.CoreMemberSelectionException;
Expand All @@ -42,8 +41,6 @@

class ReadReplicaStartupProcess implements Lifecycle
{
private static final int MAX_ATTEMPTS = 5;

private final FileSystemAbstraction fs;
private final StoreFetcher storeFetcher;
private final LocalDatabase localDatabase;
Expand Down Expand Up @@ -88,14 +85,22 @@ public void start() throws IOException
{
boolean syncedWithCore = false;
RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
for ( int attempt = 1; attempt <= MAX_ATTEMPTS && !syncedWithCore; attempt++ )
int attempt = 0;
while ( !syncedWithCore )
{
MemberId source = findCoreMemberToCopyFrom();
attempt++;
MemberId source = null;
try
{
source = connectionStrategy.coreMember();
syncStoreWithCore( source );
syncedWithCore = true;
}
catch ( CoreMemberSelectionException e )
{
lastIssue = issueOf( "finding core member", attempt );
debugLog.warn( lastIssue );
}
catch ( StoreCopyFailedException e )
{
lastIssue = issueOf( format( "copying store files from %s", source ), attempt );
Expand Down Expand Up @@ -124,8 +129,6 @@ public void start() throws IOException
debugLog.warn( lastIssue );
break;
}

attempt++;
}

if ( !syncedWithCore )
Expand Down Expand Up @@ -180,26 +183,6 @@ private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFa
}
}

private MemberId findCoreMemberToCopyFrom()
{
RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
while ( true )
{
try
{
MemberId memberId = connectionStrategy.coreMember();
debugLog.info( "Server starting, connecting to core server %s", memberId );
return memberId;
}
catch ( CoreMemberSelectionException ex )
{
debugLog.info( "Failed to connect to core server. Retrying in %d ms.", timeout.getMillis() );
LockSupport.parkUntil( timeout.getMillis() + System.currentTimeMillis() );
timeout.increment();
}
}
}

@Override
public void stop() throws Throwable
{
Expand Down
Expand Up @@ -32,13 +32,12 @@
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.state.machines.tx.ConstantTimeRetryStrategy;
import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy;
import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy;
import org.neo4j.causalclustering.helper.RetryStrategy;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.identity.MemberId;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -56,7 +55,7 @@ public class RaftReplicatorTest
private MemberId leader = new MemberId( UUID.randomUUID() );
private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself );
private LocalSessionPool sessionPool = new LocalSessionPool( session );
private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, SECONDS );
private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 0, MILLISECONDS );

@Test
public void shouldSendReplicatedContentToLeader() throws Exception
Expand Down Expand Up @@ -94,7 +93,6 @@ public void shouldResendAfterTimeout() throws Exception
CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
CapturingOutbound outbound = new CapturingOutbound();

ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 100, MILLISECONDS );
RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy );

Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.state.machines.tx;
package org.neo4j.causalclustering.helper;

import org.junit.Test;

Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreFetcher;
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.core.state.machines.tx.ConstantTimeRetryStrategy;
import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy;
import org.neo4j.causalclustering.discovery.CoreTopology;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -52,6 +52,7 @@

public class ReadReplicaStartupProcessTest
{
private ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, MILLISECONDS );
private CopiedStoreRecovery copiedStoreRecovery = mock( CopiedStoreRecovery.class );
private FileSystemAbstraction fs = mock( FileSystemAbstraction.class );
private StoreFetcher storeFetcher = mock( StoreFetcher.class );
Expand Down Expand Up @@ -83,7 +84,7 @@ public void shouldReplaceEmptyStoreWithRemote() throws Throwable

ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
new AlwaysChooseFirstMember( hazelcastTopology ), retryStrategy,
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
Expand All @@ -104,7 +105,7 @@ public void shouldNotStartWithMismatchedNonEmptyStore() throws Throwable

ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
new AlwaysChooseFirstMember( hazelcastTopology ), retryStrategy,
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
Expand Down Expand Up @@ -134,7 +135,7 @@ public void shouldStartWithMatchingDatabase() throws Throwable

ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
new AlwaysChooseFirstMember( hazelcastTopology ), retryStrategy,
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );

// when
Expand All @@ -154,7 +155,7 @@ public void stopShouldStopTheDatabaseAndStopPolling() throws Throwable

ReadReplicaStartupProcess
readReplicaStartupProcess = new ReadReplicaStartupProcess( fs, storeFetcher, localDatabase, txPulling,
new AlwaysChooseFirstMember( hazelcastTopology ), new ConstantTimeRetryStrategy( 1, MILLISECONDS ),
new AlwaysChooseFirstMember( hazelcastTopology ), retryStrategy,
NullLogProvider.getInstance(), NullLogProvider.getInstance(), copiedStoreRecovery );
readReplicaStartupProcess.start();

Expand Down

0 comments on commit e1369b8

Please sign in to comment.