Skip to content

Commit

Permalink
Merge pull request #7832 from mneedham/3.1-retry-get-store-id
Browse files Browse the repository at this point in the history
Changes to make edge servers restart properly in tyre kicking
  • Loading branch information
mneedham committed Aug 31, 2016
2 parents dd79d52 + 842cbfd commit 26bacc0
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ public <T> T makeBlockingRequest( MemberId memberId, CatchUpRequest request,
channel.setResponseHandler( responseHandler, future );
channel.send( request );

return TimeoutLoop.waitForCompletion( future, channel::millisSinceLastResponse, inactivityTimeout, timeUnit );
String operation = String.format( "Timed out executing operation %s on %s", request, memberId );

return TimeoutLoop.waitForCompletion( future, operation,
channel::millisSinceLastResponse, inactivityTimeout, timeUnit );
}

private synchronized void dispose( CatchUpChannel channel )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ public CatchUpClientException( Throwable cause )
{
super( cause );
}

public CatchUpClientException( String operation, Throwable cause )
{
super( operation, cause );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

class TimeoutLoop
{
static <T> T waitForCompletion( Future<T> future, Supplier<Long> millisSinceLastResponseSupplier,
static <T> T waitForCompletion( Future<T> future, String operation, Supplier<Long> millisSinceLastResponseSupplier,
long inactivityTimeout, TimeUnit timeUnit ) throws CatchUpClientException
{
long remainingTimeoutMillis = timeUnit.toMillis( inactivityTimeout );
Expand Down Expand Up @@ -55,7 +55,7 @@ static <T> T waitForCompletion( Future<T> future, Supplier<Long> millisSinceLast
}
else
{
throw new CatchUpClientException( e );
throw new CatchUpClientException( operation, e );
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,18 @@ public void bringUpToDateOrReplaceStoreFrom( MemberId source, StoreId wantedStor
boolean successfullyCaughtUp = false;
if ( wantedStoreId.equals( storeId ) )
{
log.info( "Bringing store up to date with %s", source );
successfullyCaughtUp = tryToCatchUp( source, storeFetcher );

if ( !successfullyCaughtUp )
{
log.info( "Failed to bring store up to date with %s.", source );
}
}

if ( !successfullyCaughtUp )
{
log.info( "Deleting local store and downloading new store from %s.", source );
storeFiles.delete( storeDir );
copyWholeStoreFrom( source, wantedStoreId, storeFetcher );
}
Expand Down Expand Up @@ -168,7 +175,9 @@ public void ensureSameStoreId( MemberId memberId, StoreFetcher storeFetcher )
throws StoreIdDownloadFailedException
{
StoreId localStoreId = storeId();
log.info( "Getting StoreId from %s", memberId );
StoreId remoteStoreId = storeFetcher.storeId( memberId );
log.info( "Got StoreId %s from %s", remoteStoreId, memberId );
if ( !localStoreId.equals( remoteStoreId ) )
{
throw new IllegalStateException( format( "This edge machine cannot join the cluster. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ boolean tryCatchingUp( MemberId from, StoreId storeId, File storeDir ) throws St
ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir );
long lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId();

try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider
) )
{
log.info( "Pulling transactions from: %d", lastCommittedTransactionId );
try
{
long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastCommittedTransactionId, writer );
long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastCommittedTransactionId,
writer );
log.info( "Txs streamed up to %d", lastPulledTxId );
return true;
}
Expand All @@ -86,7 +88,8 @@ void copyStore( MemberId from, StoreId storeId, File storeDir ) throws StoreCopy
long lastFlushedTxId = storeCopyClient.copyStoreFiles( from, new StreamToDisk( storeDir, fs ) );
log.info( "Store files streamed up to %d", lastFlushedTxId );

try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache,
logProvider ) )
{
log.info( "Pulling transactions from: %d", lastFlushedTxId - 1 );
long lastPulledTxId = txPullClient.pullTransactions( from, storeId, lastFlushedTxId - 1, writer );
Expand All @@ -101,6 +104,36 @@ void copyStore( MemberId from, StoreId storeId, File storeDir ) throws StoreCopy

public StoreId storeId( MemberId from ) throws StoreIdDownloadFailedException
{
return storeCopyClient.fetchStoreId( from );
String operation = "get store id";
long retryInterval = 5_000;
int attempts = 0;

while ( attempts++ < 5 )
{
log.info( "Attempt #%d to %s.", attempts, operation );

try
{
return storeCopyClient.fetchStoreId( from );
}
catch ( StoreIdDownloadFailedException e )
{
log.info( "Attempt #%d to %s failed.", attempts, operation );
}

try
{
log.info( "Next attempt to %s in %d ms.", operation, retryInterval );
Thread.sleep( retryInterval );
retryInterval = retryInterval * 2;
}
catch ( InterruptedException e )
{
Thread.interrupted();
throw new StoreIdDownloadFailedException( e );
}
}

throw new StoreIdDownloadFailedException( "Failed to get store id after " + (attempts - 1) + " attempts" );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ public StoreIdDownloadFailedException( Throwable cause )
{
super( cause );
}

public StoreIdDownloadFailedException( String message )
{
super( message );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public ClusterTopology currentTopology()
public void start() throws Throwable
{
edgeRefreshTimer = renewableTimeoutService.create( REFRESH_EDGE, edgeRefreshRate, 0, timeout -> {
retry( ( hazelcastInstance ) -> addEdgeServer( hazelcastInstance ) );
timeout.renew();
retry( ( hazelcastInstance ) -> addEdgeServer( hazelcastInstance ) );
} );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,18 @@ public void start() throws Throwable
MemberId memberId = findCoreMemberToCopyFrom();
if ( localDatabase.isEmpty() )
{
log.info( "Local database is empty. Stopping local database" );
localDatabase.stop();
log.info( "Getting StoreId from %s", memberId );
StoreId storeId = storeFetcher.storeId( memberId );
log.info( "Got StoreId %s from %s", storeId, memberId );
localDatabase.bringUpToDateOrReplaceStoreFrom( memberId, storeId, storeFetcher );
localDatabase.start();
}
else
{
log.info( "Already have store with StoreId %s. Checking that it matches the StoreId of %s",
localDatabase.storeId(), memberId );
localDatabase.ensureSameStoreId( memberId, storeFetcher );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void shouldReturnImmediatelyIfFutureIsAlreadyComplete() throws Exception
Supplier<Long> lastResponseSupplier = () -> 1L;

// when
long value = TimeoutLoop.<Long>waitForCompletion( future, lastResponseSupplier, 2, MILLISECONDS );
long value = TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 2, MILLISECONDS );

// then
assertEquals( 12L, value );
Expand All @@ -65,7 +65,7 @@ public void shouldTimeoutIfNoActivity() throws Exception
try
{
// when
TimeoutLoop.<Long>waitForCompletion( future, lastResponseSupplier, 1, MILLISECONDS );
TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 1, MILLISECONDS );
fail( "Should have timed out" );
}
catch ( CatchUpClientException e )
Expand All @@ -86,7 +86,7 @@ public void shouldKeepWaitingIfThereIsSomeActivity() throws Exception
Supplier<Long> lastResponseSupplier = () -> 1L;

// when
long value = TimeoutLoop.<Long>waitForCompletion( future, lastResponseSupplier, 2, MILLISECONDS );
long value = TimeoutLoop.<Long>waitForCompletion( future, "", lastResponseSupplier, 2, MILLISECONDS );

// then
assertEquals( 12L, value );
Expand All @@ -102,7 +102,7 @@ public void shouldTranslateExecutionExceptionToCatchUpClientException() throws E
// when
try
{
TimeoutLoop.<Long>waitForCompletion( future, () -> 1L, 2, MILLISECONDS );
TimeoutLoop.<Long>waitForCompletion( future, "", () -> 1L, 2, MILLISECONDS );
fail( "Should have thrown exception" );
}
catch ( CatchUpClientException e )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ public void start() throws InterruptedException, ExecutionException
}
}

public void startCoreMembers() throws InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newCachedThreadPool( new NamedThreadFactory( "cluster-starter" ) );
try
{
startCoreMembers( executor );
}
finally
{
executor.shutdown();
}
}

public Set<CoreClusterMember> healthyCoreMembers()
{
return coreMembers.values().stream()
Expand Down Expand Up @@ -160,7 +173,7 @@ public void shutdown() throws ExecutionException, InterruptedException
shutdownEdgeMembers();
}

private void shutdownCoreMembers() throws InterruptedException, ExecutionException
public void shutdownCoreMembers() throws InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newCachedThreadPool();
List<Callable<Object>> memberShutdownSuppliers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
Mockito.verify( localDatabase ).start();
Mockito.verify( localDatabase ).isEmpty();
Mockito.verify( localDatabase ).ensureSameStoreId( memberId, storeFetcher );
Mockito.verify( localDatabase ).storeId();
Mockito.verifyNoMoreInteractions( localDatabase, dataSourceManager );
Mockito.verifyZeroInteractions( txPulling );
}
Expand Down Expand Up @@ -161,6 +162,7 @@ txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
Mockito.verify( localDatabase ).isEmpty();
Mockito.verify( localDatabase ).ensureSameStoreId( memberId, storeFetcher );
Mockito.verify( localDatabase ).start();
Mockito.verify( localDatabase ).storeId();
Mockito.verify( txPulling ).start();
Mockito.verifyNoMoreInteractions( localDatabase, txPulling );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
*/
package org.neo4j.coreedge.scenarios;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

import org.neo4j.collection.RawIterator;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory;
Expand All @@ -48,8 +48,10 @@
import org.neo4j.test.coreedge.ClusterRule;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;

import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureName;
import static org.neo4j.kernel.api.security.AccessMode.Static.READ;
import static org.neo4j.test.assertion.Assert.assertEventually;
Expand Down Expand Up @@ -131,6 +133,30 @@ public void shouldDiscoverCoreAndEdgeMembers() throws Exception
}
}

@Test
public void shouldDiscoverEdgeMembersAfterRestartingCores() throws Exception
{
// given
clusterRule.withNumberOfCoreMembers( 3 );
clusterRule.withNumberOfEdgeMembers( 3 );

// when
Cluster cluster = clusterRule.startCluster();
cluster.shutdownCoreMembers();
cluster.startCoreMembers();

Matcher<List<MemberInfo>> expected = allOf(
containsAddress( "127.0.0.1:8000" ), containsAddress( "127.0.0.1:8001" ), containsAddress( "127.0.0.1:8002" ),
containsAddress( "127.0.0.1:9000" ), containsAddress( "127.0.0.1:9001" ), containsAddress( "127.0.0.1:9002" ),
containsRole( Role.LEADER, 1 ), containsRole( Role.FOLLOWER, 2 ), containsRole( Role.READ_REPLICA, 3 ) );

for ( int coreServerId = 0; coreServerId < 3; coreServerId++ )
{
// then
assertEventualOverview( cluster, expected, coreServerId );
}
}

@Test
public void shouldDiscoverNewCoreMembers() throws Exception
{
Expand Down

0 comments on commit 26bacc0

Please sign in to comment.