Skip to content

Commit

Permalink
Ignore more transient failures in backup stress test
Browse files Browse the repository at this point in the history
I.e., connection reset by peer and store has been closed server side
during backup.

Also extracts into predicates the various checks on exceptions to
determine if they are transient or not.
  • Loading branch information
davidegrohmann committed Oct 17, 2016
1 parent a245607 commit dd89171
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 68 deletions.
Expand Up @@ -20,18 +20,21 @@
package org.neo4j.coreedge.stresstests;

import java.io.File;
import java.net.ConnectException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

import org.neo4j.backup.OnlineBackup;
import org.neo4j.com.ComException;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.helpers.SocketAddress;

class BackupLoad extends RepeatUntilOnSelectedMemberCallable
{
private final Predicate<Throwable> isTransientError =
new IsConnectionException().or( new IsConnectionRestByPeer() ).or( new IsChannelClosedException() )
.or( new IsStoreClosed() );

private final File baseDirectory;
private final BiFunction<Boolean,Integer,SocketAddress> backupAddress;

Expand All @@ -56,7 +59,7 @@ protected void doWorkOnMember( boolean isCore, int id )
}
catch ( RuntimeException e )
{
if ( isConnectionError( e ) )
if ( isTransientError.test( e ) )
{
// if we could not connect, wait a bit and try again...
LockSupport.parkNanos( 10_000_000 );
Expand All @@ -70,32 +73,4 @@ protected void doWorkOnMember( boolean isCore, int id )
throw new RuntimeException( "Not consistent backup from " + address );
}
}

private boolean isConnectionError( RuntimeException e )
{
return isConnectionException( e ) || isChannelClosedException( e );
}

private boolean isConnectionException( Throwable e )
{
if ( e == null )
{
return false;
}

return e instanceof ConnectException || isConnectionException( e.getCause() );

}

private boolean isChannelClosedException( Throwable e )
{
if ( e == null )
{
return false;
}

boolean match = e instanceof ComException && e.getMessage() != null &&
e.getMessage().startsWith( "Channel has been closed" );
return match || isChannelClosedException( e.getCause() );
}
}
Expand Up @@ -36,15 +36,14 @@
import org.neo4j.coreedge.discovery.EdgeClusterMember;
import org.neo4j.coreedge.handlers.ExceptionMonitoringHandler;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;

import static org.neo4j.function.Predicates.await;

class CatchUpLoad extends RepeatUntilCallable
{
private static final IllegalStateException databaseShutdownEx = new IllegalStateException( "database is shutdown" );
private final Predicate<Throwable> isStoreClosed = new IsStoreClosed();
private Cluster cluster;

CatchUpLoad( BooleanSupplier keepGoing, Runnable onFailure, Cluster cluster )
Expand Down Expand Up @@ -133,7 +132,7 @@ private Supplier<Throwable> startAndRegisterExceptionMonitor( EdgeClusterMember
// the database is create when starting the edge...
final Monitors monitors =
edgeClusterMember.database().getDependencyResolver().resolveDependency( Monitors.class );
ExceptionMonitor exceptionMonitor = new ExceptionMonitor( new ConnectionResetFilter() );
ExceptionMonitor exceptionMonitor = new ExceptionMonitor( new IsConnectionRestByPeer() );
monitors.addMonitorListener( exceptionMonitor, CatchUpClient.class.getName() );
return exceptionMonitor;
}
Expand All @@ -143,7 +142,7 @@ private long txId( ClusterMember member, boolean fail )
GraphDatabaseAPI database = member.database();
if ( database == null )
{
return errorValueOrThrow( fail, databaseShutdownEx );
return errorValueOrThrow( fail, new IllegalStateException( "database is shutdown" ) );
}

try
Expand All @@ -153,7 +152,7 @@ private long txId( ClusterMember member, boolean fail )
}
catch ( Throwable ex )
{
return errorValueOrThrow( fail && !isStoreClosed( ex ), ex );
return errorValueOrThrow( fail && !isStoreClosed.test( ex ), ex );
}
}

Expand All @@ -169,38 +168,6 @@ private long errorValueOrThrow( boolean fail, Throwable error )
}
}

private boolean isStoreClosed( Throwable ex )
{
if ( ex == null )
{
return false;
}

if ( ex instanceof UnsatisfiedDependencyException )
{
return true;
}

if ( ex instanceof IllegalStateException )
{
String message = ex.getMessage();
return message.startsWith( "MetaDataStore for file " ) && message.endsWith( " is closed" );
}

return isStoreClosed( ex.getCause() );
}

private static class ConnectionResetFilter implements Predicate<Throwable>
{
private static final String MSG = "Connection reset by peer";

@Override
public boolean test( Throwable throwable )
{
return (throwable instanceof IOException) && MSG.equals( throwable.getMessage() );
}
}

private static class ExceptionMonitor implements ExceptionMonitoringHandler.Monitor, Supplier<Throwable>
{
private final AtomicReference<Throwable> exception = new AtomicReference<>();
Expand Down
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.stresstests;

import java.util.function.Predicate;

import org.neo4j.com.ComException;

class IsChannelClosedException implements Predicate<Throwable>
{

@Override
public boolean test( Throwable e )
{
if ( e == null )
{
return false;
}

if ( e instanceof ComException && e.getMessage() != null &&
e.getMessage().startsWith( "Channel has been closed" ) )
{
return true;
}

return test( e.getCause() );
}
}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.stresstests;

import java.net.ConnectException;
import java.util.function.Predicate;

class IsConnectionException implements Predicate<Throwable>
{

@Override
public boolean test( Throwable e )
{
if ( e == null )
{
return false;
}

if ( e instanceof ConnectException )
{
return true;
}

return test( e.getCause() );
}
}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.stresstests;

import java.io.IOException;
import java.util.function.Predicate;

class IsConnectionRestByPeer implements Predicate<Throwable>
{
@Override
public boolean test( Throwable e )
{
if ( e == null )
{
return false;
}

if ( e instanceof IOException && e.getMessage() != null &&
e.getMessage().startsWith( "Connection reset by peer" ) )
{
return true;
}

return test( e.getCause() );
}
}
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.stresstests;

import java.util.function.Predicate;

import org.neo4j.kernel.impl.util.UnsatisfiedDependencyException;

class IsStoreClosed implements Predicate<Throwable>
{
@Override
public boolean test( Throwable ex )
{

if ( ex == null )
{
return false;
}

if ( ex instanceof UnsatisfiedDependencyException )
{
return true;
}

if ( ex instanceof IllegalStateException )
{
String message = ex.getMessage();
return message.startsWith( "MetaDataStore for file " ) && message.endsWith( " is closed" );
}

return test( ex.getCause() );
}
}

0 comments on commit dd89171

Please sign in to comment.