Skip to content

Commit

Permalink
Revert "Merge branch '2.3' into 3.0"
Browse files Browse the repository at this point in the history
This reverts commit 6df03bc, reversing
changes made to c41236a.

This is because it uncovered some strange failurs on windows
  • Loading branch information
spacecowboy committed Mar 4, 2016
1 parent 1d79072 commit 5f855b5
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 134 deletions.
Expand Up @@ -52,12 +52,11 @@ class BackupClient extends Client<TheBackupInterface> implements TheBackupInterf

static final long BIG_READ_TIMEOUT = 40 * 1000;

public BackupClient( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId, long timeout,
public BackupClient( String hostNameOrIp, int port, LogProvider logProvider, StoreId storeId, long timeout,
ResponseUnpacker unpacker, ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor,
LogEntryReader<ReadableClosablePositionAwareChannel> reader )
{
super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, FRAME_LENGTH,
super( hostNameOrIp, port, logProvider, storeId, FRAME_LENGTH,
new ProtocolVersion( PROTOCOL_VERSION, ProtocolVersion.INTERNAL_PROTOCOL_VERSION ), timeout,
Client.DEFAULT_MAX_NUMBER_OF_CONCURRENT_CHANNELS_PER_CLIENT, FRAME_LENGTH, unpacker,
byteCounterMonitor, requestMonitor, reader );
Expand Down
Expand Up @@ -149,7 +149,7 @@ BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePor
@Override
public Response<?> copyStore( StoreWriter writer )
{
client = new BackupClient( sourceHostNameOrIp, sourcePort, null, NullLogProvider.getInstance(),
client = new BackupClient( sourceHostNameOrIp, sourcePort, NullLogProvider.getInstance(),
StoreId.DEFAULT, timeout, ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, monitors.newMonitor(
ByteCounterMonitor.class ), monitors.newMonitor( RequestMonitor.class ), entryReader );
client.start();
Expand Down Expand Up @@ -306,7 +306,7 @@ private BackupOutcome incrementalWithContext( String sourceHostNameOrIp, int sou

Monitors monitors = resolver.resolveDependency( Monitors.class );
LogProvider logProvider = resolver.resolveDependency( LogService.class ).getInternalLogProvider();
BackupClient client = new BackupClient( sourceHostNameOrIp, sourcePort, null, logProvider, targetDb.storeId(),
BackupClient client = new BackupClient( sourceHostNameOrIp, sourcePort, logProvider, targetDb.storeId(),
timeout, unpacker, monitors.newMonitor( ByteCounterMonitor.class, BackupClient.class ),
monitors.newMonitor( RequestMonitor.class, BackupClient.class ), entryReader );

Expand Down
Expand Up @@ -78,7 +78,7 @@ private void shouldGatherForensicsInFullBackupRequest( boolean forensics ) throw

LogEntryReader<ReadableClosablePositionAwareChannel> reader =
new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory() );
BackupClient client = life.add( new BackupClient( host, port, null, NullLogProvider.getInstance(), storeId, 1000,
BackupClient client = life.add( new BackupClient( host, port, NullLogProvider.getInstance(), storeId, 1000,
mock( ResponseUnpacker.class ), mock( ByteCounterMonitor.class ), mock( RequestMonitor.class ), reader ) );
ControlledBackupInterface backup = new ControlledBackupInterface();
life.add( new BackupServer( backup, new HostnamePort( host, port ), NullLogProvider.getInstance(), mock( ByteCounterMonitor.class ),
Expand Down
Expand Up @@ -403,27 +403,32 @@ public void addNetworkChannelsListener( NetworkChannelsListener listener )

private Channel openChannel( URI clusterUri )
{
SocketAddress destination = new InetSocketAddress( clusterUri.getHost(),
clusterUri.getPort() == -1 ? config.defaultPort() : clusterUri.getPort() );
// We must specify the origin address in case the server has multiple IPs per interface
SocketAddress origin = new InetSocketAddress( me.getHost(), 0 );
// TODO refactor the creation of InetSocketAddress'es into HostnamePort, so we can be rid of this defaultPort
// method and simplify code a couple of places
SocketAddress address = new InetSocketAddress( clusterUri.getHost(), clusterUri.getPort() == -1 ? config
.defaultPort() : clusterUri.getPort() );

msgLog.info( "Attempting to connect from " + origin + " to " + destination );
ChannelFuture channelFuture = clientBootstrap.connect( destination, origin );
channelFuture.awaitUninterruptibly( 5, TimeUnit.SECONDS );
ChannelFuture channelFuture = clientBootstrap.connect( address );

if ( channelFuture.isSuccess() )
try
{
Channel channel = channelFuture.getChannel();
msgLog.info( "Connected from " + channel.getLocalAddress() + " to " + channel.getRemoteAddress() );
return channel;
if ( channelFuture.await( 5, TimeUnit.SECONDS ) && channelFuture.getChannel().isConnected() )
{
msgLog.info( me + " opened a new channel to " + address );
return channelFuture.getChannel();
}

String msg = "Client could not connect to " + address;
throw new ChannelOpenFailedException( msg );
}
catch ( InterruptedException e )
{
msgLog.warn( "Interrupted", e );
// Restore the interrupt status since we are not rethrowing InterruptedException
// We may be running in an executor and we could fail to be terminated
Thread.currentThread().interrupt();
throw new ChannelOpenFailedException( e );
}

Throwable cause = channelFuture.getCause();
msgLog.info( "Failed to connect to " + destination + " due to: " + cause );

throw new ChannelOpenFailedException( cause );
}

private class NetworkNodePipelineFactory
Expand Down
36 changes: 13 additions & 23 deletions enterprise/com/src/main/java/org/neo4j/com/Client.java
Expand Up @@ -79,8 +79,7 @@ public abstract class Client<T> extends LifecycleAdapter implements ChannelPipel
private static final String MONITORING_CHANNEL_HANDLER_NAME = "monitor";

private ClientBootstrap bootstrap;
private final SocketAddress destination;
private final SocketAddress origin;
private final SocketAddress address;
private final Log msgLog;
private ResourcePool<ChannelContext> channelPool;
private final Protocol protocol;
Expand All @@ -95,9 +94,8 @@ public abstract class Client<T> extends LifecycleAdapter implements ChannelPipel
private final RequestMonitor requestMonitor;
private final LogEntryReader<ReadableClosablePositionAwareChannel> entryReader;

public Client( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId, int frameLength, ProtocolVersion protocolVersion,
long readTimeout, int maxConcurrentChannels, int chunkSize,
public Client( String hostNameOrIp, int port, LogProvider logProvider, StoreId storeId, int frameLength,
ProtocolVersion protocolVersion, long readTimeout, int maxConcurrentChannels, int chunkSize,
ResponseUnpacker responseUnpacker,
ByteCounterMonitor byteCounterMonitor,
RequestMonitor requestMonitor,
Expand All @@ -118,12 +116,11 @@ public Client( String destinationHostNameOrIp, int destinationPort, String origi
// ResourcePool no longer controls max concurrent channels. Use this value for the pool size
this.maxUnusedChannels = maxConcurrentChannels;
this.comExceptionHandler = getNoOpComExceptionHandler();
this.destination = new InetSocketAddress( destinationHostNameOrIp, destinationPort );
origin = originHostNameOrIp == null ? null : new InetSocketAddress( originHostNameOrIp, 0);
this.address = new InetSocketAddress( hostNameOrIp, port );
this.protocol = createProtocol( chunkSize, protocolVersion.getApplicationProtocol() );
this.responseUnpacker = responseUnpacker;

msgLog.info( getClass().getSimpleName() + " communication channel created towards " + destination );
msgLog.info( getClass().getSimpleName() + " communication channel created towards " + address );
}

private ComExceptionHandler getNoOpComExceptionHandler()
Expand Down Expand Up @@ -158,8 +155,8 @@ protected Protocol createProtocol( int chunkSize, byte applicationProtocolVersio
public void start()
{
bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory(
newCachedThreadPool( daemon( getClass().getSimpleName() + "-boss@" + destination ) ),
newCachedThreadPool( daemon( getClass().getSimpleName() + "-worker@" + destination ) ) ) );
newCachedThreadPool( daemon( getClass().getSimpleName() + "-boss@" + address ) ),
newCachedThreadPool( daemon( getClass().getSimpleName() + "-worker@" + address ) ) ) );
bootstrap.setPipelineFactory( this );

channelPool = new ResourcePool<ChannelContext>( maxUnusedChannels,
Expand All @@ -169,26 +166,19 @@ public void start()
@Override
protected ChannelContext create()
{
msgLog.info( threadInfo() + "Trying to open a new channel from " + origin + " to " + destination,
true );
// We must specify the origin address in case the server has multiple IPs per interface
ChannelFuture channelFuture = bootstrap.connect( destination, origin );
ChannelFuture channelFuture = bootstrap.connect( address );
channelFuture.awaitUninterruptibly( 5, TimeUnit.SECONDS );
if ( channelFuture.isSuccess() )
{
msgLog.info( threadInfo() + "Opened a new channel from " +
channelFuture.getChannel().getLocalAddress() + " to " +
channelFuture.getChannel().getRemoteAddress() );
msgLog.info( threadInfo() + "Opened a new channel to " + address );

return new ChannelContext( channelFuture.getChannel(), ChannelBuffers.dynamicBuffer(),
ByteBuffer.allocate( 1024 * 1024 ) );
}

Throwable cause = channelFuture.getCause();
String msg = Client.this.getClass().getSimpleName() + " could not connect from " + origin + " to " +
destination;
String msg = Client.this.getClass().getSimpleName() + " could not connect to " + address;
msgLog.debug( msg, true );
throw traceComException( new ComException( msg, cause ), "Client.start" );
throw traceComException( new ComException( msg, channelFuture.getCause() ), "Client.start" );
}

@Override
Expand Down Expand Up @@ -347,7 +337,7 @@ private ChannelContext acquireChannelContext( RequestType<T> type )
{
if ( channelPool == null )
{
throw new ComException( String.format( "Client for %s is stopped", destination.toString() ) );
throw new ComException( String.format( "Client for %s is stopped", address.toString() ) );
}

// Calling acquire is dangerous since it may be a blocking call... and if this
Expand Down Expand Up @@ -410,6 +400,6 @@ private static BlockingReadHandler<ChannelBuffer> extractBlockingReadHandler( Ch
@Override
public String toString()
{
return getClass().getSimpleName() + "[" + destination + "]";
return getClass().getSimpleName() + "[" + address + "]";
}
}
6 changes: 3 additions & 3 deletions enterprise/com/src/test/java/org/neo4j/com/MadeUpClient.java
Expand Up @@ -19,14 +19,14 @@
*/
package org.neo4j.com;

import org.jboss.netty.buffer.ChannelBuffer;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

import org.jboss.netty.buffer.ChannelBuffer;

import org.neo4j.com.MadeUpServer.MadeUpRequestType;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.ResponseUnpacker;
Expand All @@ -48,7 +48,7 @@ public class MadeUpClient extends Client<MadeUpCommunicationInterface> implement
public MadeUpClient( int port, StoreId storeIdToExpect, byte internalProtocolVersion,
byte applicationProtocolVersion, int chunkSize, ResponseUnpacker responseUnpacker )
{
super( localhost(), port, null, NullLogProvider.getInstance(), storeIdToExpect, FRAME_LENGTH,
super( localhost(), port, NullLogProvider.getInstance(), storeIdToExpect, FRAME_LENGTH,
new ProtocolVersion( applicationProtocolVersion, internalProtocolVersion ),
Client.DEFAULT_READ_RESPONSE_TIMEOUT_SECONDS * 1000,
Client.DEFAULT_MAX_NUMBER_OF_CONCURRENT_CHANNELS_PER_CLIENT,
Expand Down
Expand Up @@ -80,30 +80,26 @@ public class MasterClient210 extends Client<Master> implements MasterClient
private final long lockReadTimeoutMillis;
private final HaRequestTypes requestTypes;

public MasterClient210( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId, long readTimeoutMillis,
public MasterClient210( String hostNameOrIp, int port, LogProvider logProvider, StoreId storeId, long readTimeoutMillis,
long lockReadTimeoutMillis, int maxConcurrentChannels, int chunkSize,
ResponseUnpacker responseUnpacker,
ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor,
LogEntryReader<ReadableClosablePositionAwareChannel> entryReader )
{
super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId,
MasterServer.FRAME_LENGTH, PROTOCOL_VERSION, readTimeoutMillis, maxConcurrentChannels, chunkSize,
responseUnpacker, byteCounterMonitor, requestMonitor, entryReader );
super( hostNameOrIp, port, logProvider, storeId, MasterServer.FRAME_LENGTH, PROTOCOL_VERSION, readTimeoutMillis,
maxConcurrentChannels, chunkSize, responseUnpacker, byteCounterMonitor, requestMonitor, entryReader );
this.lockReadTimeoutMillis = lockReadTimeoutMillis;
this.requestTypes = new HaRequestType210( entryReader );
}

MasterClient210( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId, long readTimeoutMillis,
MasterClient210( String hostNameOrIp, int port, LogProvider logProvider, StoreId storeId, long readTimeoutMillis,
long lockReadTimeoutMillis, int maxConcurrentChannels, int chunkSize,
ProtocolVersion protocolVersion, ResponseUnpacker responseUnpacker,
ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor,
LogEntryReader<ReadableClosablePositionAwareChannel> entryReader )
{
super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId,
MasterServer.FRAME_LENGTH, protocolVersion, readTimeoutMillis, maxConcurrentChannels, chunkSize,
responseUnpacker, byteCounterMonitor, requestMonitor, entryReader );
super( hostNameOrIp, port, logProvider, storeId, MasterServer.FRAME_LENGTH, protocolVersion, readTimeoutMillis,
maxConcurrentChannels, chunkSize, responseUnpacker, byteCounterMonitor, requestMonitor, entryReader );
this.lockReadTimeoutMillis = lockReadTimeoutMillis;
this.requestTypes = new HaRequestType210( entryReader );
}
Expand Down
Expand Up @@ -36,15 +36,13 @@ public class MasterClient214 extends MasterClient210
{
public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 8, INTERNAL_PROTOCOL_VERSION );

public MasterClient214( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId, long readTimeoutSeconds,
public MasterClient214( String hostNameOrIp, int port, LogProvider logProvider, StoreId storeId, long readTimeoutSeconds,
long lockReadTimeout, int maxConcurrentChannels, int chunkSize, ResponseUnpacker unpacker,
ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor,
LogEntryReader<ReadableClosablePositionAwareChannel> entryReader )
{
super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, readTimeoutSeconds,
lockReadTimeout, maxConcurrentChannels, chunkSize, PROTOCOL_VERSION, unpacker, byteCounterMonitor,
requestMonitor, entryReader );
super( hostNameOrIp, port, logProvider, storeId, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels,
chunkSize, PROTOCOL_VERSION, unpacker, byteCounterMonitor, requestMonitor, entryReader );
}

@Override
Expand Down

0 comments on commit 5f855b5

Please sign in to comment.