Skip to content

Commit

Permalink
Ignore 'Connection reset by peer' exception
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Sep 28, 2016
1 parent f86fde0 commit c6b80c4
Showing 1 changed file with 46 additions and 12 deletions.
Expand Up @@ -21,17 +21,21 @@

import io.netty.channel.Channel;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.neo4j.coreedge.catchup.CatchUpClient;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.ClusterMember;
import org.neo4j.coreedge.discovery.CoreClusterMember;
import org.neo4j.coreedge.discovery.EdgeClusterMember;
import org.neo4j.coreedge.handlers.ExceptionMonitoringHandler;
import org.neo4j.function.Predicates;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -71,7 +75,7 @@ protected void doWork()
int newMemberId = cluster.edgeMembers().size();
final EdgeClusterMember edgeClusterMember = cluster.addEdgeMemberWithId( newMemberId );

AtomicReference<Throwable> exception;
Supplier<Throwable> exception;
try
{
exception = startAndRegisterExceptionMonitor( edgeClusterMember );
Expand All @@ -92,23 +96,16 @@ protected void doWork()
}
}

private AtomicReference<Throwable> startAndRegisterExceptionMonitor( EdgeClusterMember edgeClusterMember )
private Supplier<Throwable> startAndRegisterExceptionMonitor( EdgeClusterMember edgeClusterMember )
{
edgeClusterMember.start();

// the database is create when starting the edge...
final Monitors monitors =
edgeClusterMember.database().getDependencyResolver().resolveDependency( Monitors.class );
AtomicReference<Throwable> exception = new AtomicReference<>();
monitors.addMonitorListener( new ExceptionMonitoringHandler.Monitor()
{
@Override
public void exceptionCaught( Channel channel, Throwable cause )
{
exception.set( cause );
}
}, CatchUpClient.class.getName() );
return exception;
ExceptionMonitor exceptionMonitor = new ExceptionMonitor( new ConnectionResetFilter() );
monitors.addMonitorListener( exceptionMonitor, CatchUpClient.class.getName() );
return exceptionMonitor;
}

private long txId( ClusterMember leader )
Expand All @@ -121,4 +118,41 @@ private long txId( ClusterMember leader )
return database.getDependencyResolver().resolveDependency( TransactionIdStore.class )
.getLastClosedTransactionId();
}

private static class ConnectionResetFilter implements Predicate<Throwable>
{
private static final String MSG = "Connection reset by peer";

@Override
public boolean test( Throwable throwable )
{
return (throwable instanceof IOException) && MSG.equals( throwable.getMessage() );
}
}

private static class ExceptionMonitor implements ExceptionMonitoringHandler.Monitor, Supplier<Throwable>
{
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private Predicate<Throwable> reject;

ExceptionMonitor( Predicate<Throwable> reject )
{
this.reject = reject;
}

@Override
public void exceptionCaught( Channel channel, Throwable cause )
{
if ( !reject.test( cause ) )
{
exception.set( cause );
}
}

@Override
public Throwable get()
{
return exception.get();
}
}
}

0 comments on commit c6b80c4

Please sign in to comment.