Skip to content

Commit

Permalink
Retry Edge Server startup if failed on download from core server.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Sep 19, 2016
1 parent 1bf0771 commit 78be46c
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 48 deletions.
Expand Up @@ -82,7 +82,8 @@ private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId,
}
}

public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) throws StoreCopyFailedException
public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )
throws StoreCopyFailedException, StreamingTransactionsFailedException
{
try
{
Expand All @@ -98,7 +99,7 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) th
CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, pullTxIndex );
if ( catchupResult != SUCCESS )
{
throw new StoreCopyFailedException( "Failed to pull transactions: " + catchupResult );
throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult );
}
}
catch ( IOException e )
Expand All @@ -109,36 +110,6 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir ) th

public StoreId getStoreIdOf( MemberId from ) throws StoreIdDownloadFailedException
{
String operation = "get store id from " + from;
long retryInterval = 5_000;
int attempts = 0;

while ( attempts++ < 5 )
{
log.info( "Attempt #%d to %s.", attempts, operation );

try
{
return storeCopyClient.fetchStoreId( from );
}
catch ( StoreIdDownloadFailedException e )
{
log.info( "Attempt #%d to %s failed.", attempts, operation );
}

try
{
log.info( "Next attempt to %s in %d ms.", operation, retryInterval );
Thread.sleep( retryInterval );
retryInterval = retryInterval * 2;
}
catch ( InterruptedException e )
{
Thread.interrupted();
throw new StoreIdDownloadFailedException( e );
}
}

throw new StoreIdDownloadFailedException( "Failed to " + operation + " after " + (attempts - 1) + " attempts" );
return storeCopyClient.fetchStoreId( from );
}
}
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.storecopy;

public class StreamingTransactionsFailedException extends Exception
{
StreamingTransactionsFailedException( String message )
{
super( message );
}
}
Expand Up @@ -68,7 +68,7 @@ public CoreAddresses find( MemberId memberId ) throws NoKnownAddressesException
CoreAddresses coreAddresses = coreMembers.get( memberId );
if ( coreAddresses == null )
{
throw new NoKnownAddressesException();
throw new NoKnownAddressesException( "Unable to find address mapping for member: " + memberId );
}
return coreAddresses;
}
Expand Down
Expand Up @@ -21,4 +21,8 @@

public class NoKnownAddressesException extends Exception
{
public NoKnownAddressesException( String message )
{
super( message );
}
}
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.coreedge.catchup.storecopy.StoreFetcher;
import org.neo4j.coreedge.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.coreedge.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.coreedge.catchup.storecopy.TemporaryStoreDirectory;
import org.neo4j.coreedge.core.state.machines.tx.RetryStrategy;
import org.neo4j.coreedge.identity.MemberId;
Expand Down Expand Up @@ -76,7 +77,47 @@ public void init() throws Throwable
@Override
public void start() throws Throwable
{
MemberId source = findCoreMemberToCopyFrom();
long retryInterval = 5_000;
int attempts = 0;
while ( attempts++ < 5 )
{
MemberId source = findCoreMemberToCopyFrom();
try
{
tryToStart( source );
return;
}
catch ( StoreCopyFailedException e )
{
log.info( "Attempt #%d to start edge server failed while copying store files from %s.", attempts,
source );
}
catch ( StreamingTransactionsFailedException e )
{
log.info( "Attempt #%d to start edge server failed while streaming transactions from %s.", attempts,
source );
}
catch ( StoreIdDownloadFailedException e )
{
log.info( "Attempt #%d to start edge server failed while getting store id from %s.", attempts, source );
}

try
{
Thread.sleep( retryInterval );
retryInterval = retryInterval * 2;
}
catch ( InterruptedException e )
{
Thread.interrupted();
throw new RuntimeException( "Interrupted while trying to start edge server. Shutting down.", e );
}
}
throw new Exception( "Failed to start edge server after " + (attempts - 1) + " attempts" );
}

private void tryToStart( MemberId source ) throws Throwable
{
if ( localDatabase.isEmpty() )
{
log.info( "Local database is empty, attempting to replace with copy from core server %s", source );
Expand Down Expand Up @@ -112,7 +153,8 @@ private void ensureSameStoreIdAs( MemberId remoteCore ) throws StoreIdDownloadFa
}
}

private void copyWholeStoreFrom( MemberId source, StoreId expectedStoreId, StoreFetcher storeFetcher ) throws IOException, StoreCopyFailedException
private void copyWholeStoreFrom( MemberId source, StoreId expectedStoreId, StoreFetcher storeFetcher )
throws IOException, StoreCopyFailedException, StreamingTransactionsFailedException
{
TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( localDatabase.storeDir() );
storeFetcher.copyStore( source, expectedStoreId, tempStore.storeDir() );
Expand Down
Expand Up @@ -109,9 +109,9 @@ txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
edgeStartupProcess.start();
fail( "should have thrown" );
}
catch ( IllegalStateException ex )
catch ( Exception ex )
{
// expected
//expected.
}

// then
Expand Down
Expand Up @@ -19,6 +19,9 @@
*/
package org.neo4j.coreedge.scenarios;

import org.junit.Rule;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
Expand All @@ -29,16 +32,13 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;

import org.junit.Rule;
import org.junit.Test;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.CoreGraphDatabase;
import org.neo4j.coreedge.core.consensus.log.segmented.FileNames;
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.CoreClusterMember;
import org.neo4j.coreedge.discovery.EdgeClusterMember;
import org.neo4j.coreedge.core.consensus.log.segmented.FileNames;
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.CoreGraphDatabase;
import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory;
import org.neo4j.coreedge.edge.EdgeGraphDatabase;
import org.neo4j.function.ThrowingSupplier;
Expand Down Expand Up @@ -67,7 +67,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
Expand All @@ -77,7 +76,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME;
import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME;
import static org.neo4j.function.Predicates.awaitEx;
Expand Down Expand Up @@ -203,7 +201,7 @@ public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreI
{
// Lifecycle should throw exception, server should not start.
assertThat( required.getCause(), instanceOf( LifecycleException.class ) );
assertThat( required.getCause().getCause(), instanceOf( IllegalStateException.class ) );
assertThat( required.getCause().getCause(), instanceOf( Exception.class ) );
assertThat( required.getCause().getCause().getMessage(),
containsString( "This edge machine cannot join the cluster. " +
"The local database is not empty and has a mismatching storeId:" ) );
Expand Down

0 comments on commit 78be46c

Please sign in to comment.