Skip to content

Commit

Permalink
NetworkReceiver/NetworkSender can now be paused, for better testing p…
Browse files Browse the repository at this point in the history
…urposes

Removed HA BackupProvider as it is no longer used
Refactored ClusterClient into factory+facade
Always use "127.0.0.1" for localhost in cluster testing
(cherry picked from commit ba53e31)
  • Loading branch information
rickardoberg committed Jun 11, 2015
1 parent 9cd0a79 commit 9596412
Show file tree
Hide file tree
Showing 32 changed files with 410 additions and 404 deletions.
Expand Up @@ -25,7 +25,7 @@

public class LifeRule implements TestRule
{
private LifeSupport life = new LifeSupport( );
private LifeSupport life = new LifeSupport();
private final boolean autoStart;

public LifeRule()
Expand Down Expand Up @@ -66,17 +66,18 @@ public void evaluate() throws Throwable
failure.addSuppressed( suppressed );
}
throw failure;
} finally
}
finally
{
life = new LifeSupport( );
life = new LifeSupport();
}
}
};
}

public <T> T add( T instance )
{
return life.add(instance);
return life.add( instance );
}


Expand Down
Expand Up @@ -27,6 +27,7 @@

import org.neo4j.cluster.com.NetworkReceiver;
import org.neo4j.cluster.com.NetworkSender;
import org.neo4j.cluster.protocol.atomicbroadcast.AtomicBroadcastSerializer;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AcceptorInstanceStore;
Expand Down
Expand Up @@ -19,9 +19,7 @@
*/
package org.neo4j.cluster.client;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -203,14 +201,7 @@ public boolean isFullHaMember()

private static String localhost()
{
try
{
return InetAddress.getLocalHost().getHostAddress();
}
catch ( UnknownHostException e )
{
throw new RuntimeException( e );
}
return "127.0.0.1";
}

public String getHost()
Expand Down
Expand Up @@ -67,7 +67,7 @@ public class NetworkReceiver
implements MessageSource, Lifecycle
{
public interface Monitor
extends NamedThreadFactory.Monitor
extends NamedThreadFactory.Monitor
{
void receivedMessage( Message message );

Expand Down Expand Up @@ -112,6 +112,8 @@ public interface NetworkChannelsListener

volatile boolean bindingDetected = false;

private volatile boolean paused;

public NetworkReceiver( Monitor monitor, Configuration config, Logging logging )
{
this.monitor = monitor;
Expand Down Expand Up @@ -166,6 +168,11 @@ public void shutdown()
{
}

public void setPaused( boolean paused )
{
this.paused = paused;
}

private void listen( int minPort, int maxPort )
throws URISyntaxException, ChannelException, UnknownHostException
{
Expand All @@ -177,7 +184,7 @@ private void listen( int minPort, int maxPort )
InetAddress host;
String address = config.clusterServer().getHost();
InetSocketAddress localAddress;
if ( address == null || address.equals( INADDR_ANY ))
if ( address == null || address.equals( INADDR_ANY ) )
{
localAddress = new InetSocketAddress( checkPort );
}
Expand Down Expand Up @@ -212,6 +219,11 @@ public void addMessageProcessor( MessageProcessor processor )

public void receive( Message message )
{
if ( paused )
{
return;
}

for ( MessageProcessor processor : processors )
{
try
Expand All @@ -234,14 +246,21 @@ private URI getURI( InetSocketAddress address ) throws URISyntaxException
{
String uri;

if (address.getAddress().getHostAddress().startsWith( "0" ))
uri = CLUSTER_SCHEME + "://0.0.0.0:"+address.getPort(); // Socket.toString() already prepends a /
if ( address.getAddress().getHostAddress().startsWith( "0" ) )
{
uri = CLUSTER_SCHEME + "://0.0.0.0:" + address.getPort(); // Socket.toString() already prepends a /
}
else
uri = CLUSTER_SCHEME + "://" + address.getAddress().getHostAddress()+":"+address.getPort(); // Socket.toString() already prepends a /
{
uri = CLUSTER_SCHEME + "://" + address.getAddress().getHostAddress() + ":" + address.getPort(); // Socket
}
// .toString() already prepends a /

// Add name if given
if (config.name() != null)
uri += "/?name="+config.name();
if ( config.name() != null )
{
uri += "/?name=" + config.name();
}

return URI.create( uri );
}
Expand Down Expand Up @@ -302,7 +321,8 @@ private class NetworkNodePipelineFactory
public ChannelPipeline getPipeline() throws Exception
{
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast( "frameDecoder",new ObjectDecoder( 1024 * 1000, NetworkNodePipelineFactory.this.getClass().getClassLoader() ) );
pipeline.addLast( "frameDecoder",
new ObjectDecoder( 1024 * 1000, NetworkNodePipelineFactory.this.getClass().getClassLoader() ) );
pipeline.addLast( "serverHandler", new MessageReceiver() );
return pipeline;
}
Expand All @@ -322,9 +342,9 @@ public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent e ) throws
@Override
public void messageReceived( ChannelHandlerContext ctx, MessageEvent event ) throws Exception
{
if (!bindingDetected)
if ( !bindingDetected )
{
InetSocketAddress local = ((InetSocketAddress)event.getChannel().getLocalAddress());
InetSocketAddress local = ((InetSocketAddress) event.getChannel().getLocalAddress());
bindingDetected = true;
listeningAt( getURI( local ) );
}
Expand All @@ -335,7 +355,7 @@ public void messageReceived( ChannelHandlerContext ctx, MessageEvent event ) thr
InetSocketAddress remote = (InetSocketAddress) ctx.getChannel().getRemoteAddress();
String remoteAddress = remote.getAddress().getHostAddress();
URI fromHeader = URI.create( message.getHeader( Message.FROM ) );
fromHeader = URI.create(fromHeader.getScheme()+"://"+remoteAddress + ":" + fromHeader.getPort());
fromHeader = URI.create( fromHeader.getScheme() + "://" + remoteAddress + ":" + fromHeader.getPort() );
message.setHeader( Message.FROM, fromHeader.toASCIIString() );

msgLog.debug( "Received:" + message );
Expand Down
Expand Up @@ -76,7 +76,7 @@ public class NetworkSender
implements MessageSender, Lifecycle
{
public interface Monitor
extends NamedThreadFactory.Monitor
extends NamedThreadFactory.Monitor
{
void queuedMessage( Message message );

Expand Down Expand Up @@ -115,6 +115,8 @@ public interface NetworkChannelsListener
private Map<URI, Channel> connections = new ConcurrentHashMap<URI, Channel>();
private Iterable<NetworkChannelsListener> listeners = Listeners.newListeners();

private volatile boolean paused;

public NetworkSender( Monitor monitor, Configuration config, NetworkReceiver receiver, Logging logging )
{
this.monitor = monitor;
Expand Down Expand Up @@ -215,6 +217,11 @@ public void process( final List<Message<? extends MessageType>> messages )
@Override
public boolean process( Message<? extends MessageType> message )
{
if ( paused )
{
return true;
}

if ( message.hasHeader( Message.TO ) )
{
send( message );
Expand All @@ -227,6 +234,11 @@ public boolean process( Message<? extends MessageType> message )
return true;
}

public void setPaused( boolean paused )
{
this.paused = paused;
}


private URI getURI( InetSocketAddress address ) throws URISyntaxException
{
Expand Down Expand Up @@ -302,7 +314,7 @@ public void operationComplete( ChannelFuture future ) throws Exception
}
catch ( Exception e )
{
if( Exceptions.contains(e, ClosedChannelException.class ))
if ( Exceptions.contains( e, ClosedChannelException.class ) )
{
msgLog.warn( "Could not send message, because the connection has been closed." );
}
Expand Down Expand Up @@ -450,7 +462,7 @@ public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e ) throw
if ( !(cause instanceof ConnectException || cause instanceof RejectedExecutionException) )
{
// If we keep getting the same exception, only output the first one
if (lastException != null && !lastException.getClass().equals( cause.getClass() ))
if ( lastException != null && !lastException.getClass().equals( cause.getClass() ) )
{
msgLog.error( "Receive exception:", cause );
lastException = cause;
Expand All @@ -461,9 +473,9 @@ public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e ) throw
@Override
public void writeComplete( ChannelHandlerContext ctx, WriteCompletionEvent e ) throws Exception
{
if (lastException != null)
if ( lastException != null )
{
msgLog.error( "Recovered from:", lastException);
msgLog.error( "Recovered from:", lastException );
lastException = null;
}
super.writeComplete( ctx, e );
Expand Down
Expand Up @@ -107,7 +107,6 @@ public void memberIsAvailable( String role, URI roleUri, StoreId storeId )
{
MemberIsAvailable message = new MemberIsAvailable( role, myId, serverClusterId, roleUri, storeId );
Payload payload = serializer.broadcast( message );
serializer.receive( payload );
atomicBroadcast.broadcast( payload );
}
catch ( Throwable e )
Expand All @@ -122,7 +121,6 @@ public void memberIsUnavailable( String role )
try
{
Payload payload = serializer.broadcast( new MemberIsUnavailable( role, myId, serverClusterId ) );
serializer.receive( payload );
atomicBroadcast.broadcast( payload );
}
catch ( Throwable e )
Expand Down
Expand Up @@ -33,6 +33,11 @@ public class AtomicBroadcastSerializer
private ObjectInputStreamFactory objectInputStreamFactory;
private ObjectOutputStreamFactory objectOutputStreamFactory;

public AtomicBroadcastSerializer()
{
this(new ObjectStreamFactory(), new ObjectStreamFactory());
}

public AtomicBroadcastSerializer( ObjectInputStreamFactory objectInputStreamFactory,
ObjectOutputStreamFactory objectOutputStreamFactory )
{
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;

/**
* AtomicBroadcast payload. Wraps a byte buffer and its length.
Expand Down Expand Up @@ -72,4 +73,38 @@ public void readExternal( ObjectInput in )
buf = new byte[len];
in.read( buf, 0, len );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}

Payload payload = (Payload) o;

if ( len != payload.len )
{
return false;
}
if ( !Arrays.equals( buf, payload.buf ) )
{
return false;
}

return true;
}

@Override
public int hashCode()
{
int result = buf != null ? Arrays.hashCode( buf ) : 0;
result = 31 * result + len;
return result;
}
}
Expand Up @@ -22,6 +22,9 @@
import java.io.Serializable;

import org.neo4j.cluster.com.message.MessageType;
import org.neo4j.cluster.protocol.atomicbroadcast.AtomicBroadcastSerializer;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.Payload;

/**
* Acceptor state machine messages
Expand Down Expand Up @@ -145,7 +148,20 @@ public int hashCode()
@Override
public String toString()
{
return "AcceptState{" + "ballot=" + ballot + ", value=" + value + "}";
Object toStringValue = value;
if (toStringValue instanceof Payload )
{
try
{
toStringValue = new AtomicBroadcastSerializer( new ObjectStreamFactory(), new ObjectStreamFactory() ).receive( (Payload) toStringValue);
}
catch ( Throwable e )
{
// Ignore
}
}

return "AcceptState{" + "ballot=" + ballot + ", value=" + toStringValue + "}";
}
}
}

0 comments on commit 9596412

Please sign in to comment.