Skip to content

Commit

Permalink
Log helpful message in user log when unable to bind to CE ports.
Browse files Browse the repository at this point in the history
Currently errors with binding to an address result in a stacktrace in
the internal log. We still do that, but also with a reasonable
message that says which port and which setting is the one with the bind
issue. This helps in debugging erros when setting up CE.
  • Loading branch information
Max Sumrall committed Sep 14, 2016
1 parent 473ceba commit dc23ed1
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 36 deletions.
Expand Up @@ -32,6 +32,7 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.net.BindException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand All @@ -48,15 +49,18 @@
import org.neo4j.coreedge.catchup.tx.TxPullRequestHandler;
import org.neo4j.coreedge.catchup.tx.TxPullResponseEncoder;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseEncoder;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.state.CoreState;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotEncoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequest;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestHandler;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
Expand All @@ -67,8 +71,11 @@

public class CatchupServer extends LifecycleAdapter
{
private static final Setting<ListenSocketAddress> setting = CoreEdgeClusterSettings.transaction_listen_address;
private final LogProvider logProvider;
private Monitors monitors;
private final Log log;
private final Log userLog;
private final Monitors monitors;

private final Supplier<StoreId> storeIdSupplier;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
Expand All @@ -82,25 +89,21 @@ public class CatchupServer extends LifecycleAdapter
private EventLoopGroup workerGroup;
private Channel channel;
private Supplier<CheckPointer> checkPointerSupplier;
private Log log;

public CatchupServer( LogProvider logProvider,
Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier,
Supplier<CheckPointer> checkPointerSupplier,
CoreState coreState,
ListenSocketAddress listenAddress, Monitors monitors )

public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, Supplier<CheckPointer> checkPointerSupplier, CoreState coreState,
Config config, Monitors monitors )
{
this.coreState = coreState;
this.listenAddress = listenAddress;
this.listenAddress = config.get( setting );
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.storeIdSupplier = storeIdSupplier;
this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier;
this.logProvider = logProvider;
this.monitors = monitors;
this.log = logProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
this.dataSourceSupplier = dataSourceSupplier;
this.checkPointerSupplier = checkPointerSupplier;
}
Expand All @@ -117,7 +120,7 @@ public synchronized void start() throws Throwable
.childHandler( new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel( SocketChannel ch ) throws Exception
protected void initChannel( SocketChannel ch )
{
CatchupServerProtocol protocol = new CatchupServerProtocol();

Expand Down Expand Up @@ -153,7 +156,22 @@ protected void initChannel( SocketChannel ch ) throws Exception
}
} );

channel = bootstrap.bind().syncUninterruptibly().channel();
try
{
channel = bootstrap.bind().syncUninterruptibly().channel();
}
catch( Exception e )
{
// thanks to netty we need to catch everything and do an instanceof because it does not declare properly
// checked exception but it still throws them with some black magic at runtime.
//noinspection ConstantConditions
if ( e instanceof BindException )
{
userLog.error( "Address is already bound for setting: " + setting + " with value: " + listenAddress );
log.error( "Address is already bound for setting: " + setting + " with value: " + listenAddress, e );
throw e;
}
}
}

private ChannelInboundHandler decoders( CatchupServerProtocol protocol )
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.coreedge.core.consensus;

import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -35,15 +33,21 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import java.net.BindException;
import java.util.concurrent.TimeUnit;

import org.neo4j.coreedge.VersionDecoder;
import org.neo4j.coreedge.VersionPrepender;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.replication.ReplicatedContent;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.Inbound;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.coreedge.messaging.marshalling.ChannelMarshal;
import org.neo4j.coreedge.messaging.marshalling.RaftMessageDecoder;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand All @@ -52,23 +56,26 @@

public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.StoreIdAwareMessage>
{
private final ListenSocketAddress listenAddress;
private static final Setting<ListenSocketAddress> setting = CoreEdgeClusterSettings.raft_listen_address;
private final Log log;
private final Log userLog;
private final LogProvider logProvider;
private final ChannelMarshal<ReplicatedContent> marshal;
private final ListenSocketAddress listenAddress;
private MessageHandler<RaftMessages.StoreIdAwareMessage> messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );

public RaftServer( ChannelMarshal<ReplicatedContent> marshal, ListenSocketAddress listenAddress,
LogProvider logProvider )
public RaftServer( ChannelMarshal<ReplicatedContent> marshal, Config config, LogProvider logProvider,
LogProvider userLogProvider )
{
this.marshal = marshal;
this.listenAddress = listenAddress;
this.listenAddress = config.get( setting );
this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
}

@Override
Expand Down Expand Up @@ -126,7 +133,22 @@ protected void initChannel( SocketChannel ch ) throws Exception
}
} );

channel = bootstrap.bind().syncUninterruptibly().channel();
try
{
channel = bootstrap.bind().syncUninterruptibly().channel();
}
catch ( Exception e )
{
// thanks to netty we need to catch everything and do an instanceof because it does not declare properly
// checked exception but it still throws them with some black magic at runtime.
//noinspection ConstantConditions
if ( e instanceof BindException )
{
userLog.error( "Address is already bound for setting: " + setting + " with value: " + listenAddress );
log.error( "Address is already bound for setting: " + setting + " with value: " + listenAddress, e );
throw e;
}
}
}

@Override
Expand Down
Expand Up @@ -85,6 +85,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
final FileSystemAbstraction fileSystem = platformModule.fileSystem;
final LifeSupport life = platformModule.life;
LogProvider logProvider = logging.getInternalLogProvider();
LogProvider userLogProvider = logging.getUserLogProvider();

final Supplier<DatabaseHealth> databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class );

Expand All @@ -97,9 +98,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C

consensusModule.raftMembershipManager().setRecoverFromIndexSupplier( lastFlushedStorage::getInitialState );

ListenSocketAddress raftListenAddress = config.get( CoreEdgeClusterSettings.raft_listen_address );

RaftServer raftServer = new RaftServer( new CoreReplicatedContentMarshal(), raftListenAddress, logProvider );
RaftServer raftServer = new RaftServer( new CoreReplicatedContentMarshal(), config, logProvider, userLogProvider );

LoggingInbound<RaftMessages.StoreIdAwareMessage> loggingRaftInbound =
new LoggingInbound<>( raftServer, messageLogger, myself );
Expand Down Expand Up @@ -150,11 +149,11 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C

loggingRaftInbound.registerHandler( batchingMessageHandler );

CatchupServer catchupServer = new CatchupServer( logProvider, localDatabase,
CatchupServer catchupServer = new CatchupServer( logProvider, userLogProvider, localDatabase,
platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ),
new DataSourceSupplier( platformModule ), new CheckpointerSupplier( platformModule.dependencies ),
coreState, config.get( CoreEdgeClusterSettings.transaction_listen_address ), platformModule.monitors );
coreState, config, platformModule.monitors );

life.add( coreState );
life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ),
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.LogProvider;

public interface DiscoveryServiceFactory
Expand Down
Expand Up @@ -19,24 +19,26 @@
*/
package org.neo4j.coreedge.discovery;

import java.util.List;

import com.hazelcast.config.InterfacesConfig;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import com.hazelcast.instance.GroupProperties;
import com.hazelcast.instance.GroupProperty;

import java.util.List;

import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -49,8 +51,8 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
private final Config config;
private final MemberId myself;
private final Log log;
private final CoreTopologyListenerService listenerService;
private final Log userLog;
private final CoreTopologyListenerService listenerService;
private String membershipRegistrationId;

private HazelcastInstance hazelcastInstance;
Expand Down Expand Up @@ -141,13 +143,14 @@ private HazelcastInstance createHazelcastInstance()
log.info( "Discovering cluster with initial members: " + initialMembers );

NetworkConfig networkConfig = new NetworkConfig();
ListenSocketAddress hazelcastAddress = config.get( CoreEdgeClusterSettings.discovery_listen_address );
Setting<ListenSocketAddress> discovery_listen_address = CoreEdgeClusterSettings.discovery_listen_address;
ListenSocketAddress hazelcastAddress = config.get( discovery_listen_address );
InterfacesConfig interfaces = new InterfacesConfig();
interfaces.addInterface( hazelcastAddress.getHostname() );
networkConfig.setInterfaces( interfaces );
networkConfig.setPort( hazelcastAddress.getPort() );
networkConfig.setJoin( joinConfig );

networkConfig.setPortAutoIncrement( false );
com.hazelcast.config.Config c = new com.hazelcast.config.Config();
c.setProperty( GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS, "10000" );
c.setProperty( GroupProperties.PROP_INITIAL_MIN_CLUSTER_SIZE,
Expand All @@ -160,7 +163,20 @@ private HazelcastInstance createHazelcastInstance()

c.setMemberAttributeConfig( memberAttributeConfig );
userLog.info( "Waiting for other members to join cluster before continuing..." );
return Hazelcast.newHazelcastInstance( c );
try
{
hazelcastInstance = Hazelcast.newHazelcastInstance( c );
}
catch ( HazelcastException e )
{
String errorMessage = String.format( "Hazelcast was unable to start with setting: %s = %s",
discovery_listen_address.name(), config.get( discovery_listen_address ) );
userLog.error( errorMessage );
log.error( errorMessage, e );
throw new RuntimeException( e );
}

return hazelcastInstance;
}

private Integer minimumClusterSizeThatCanTolerateOneFaultForExpectedClusterSize()
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.LogProvider;

public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.coreedge.edge;

import java.io.File;
import java.io.IOException;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -52,7 +51,6 @@
import org.neo4j.kernel.DatabaseAvailability;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.ReadOnlyTransactionCommitProcess;
Expand Down
Expand Up @@ -35,14 +35,15 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.LogProvider;

import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;

public class SharedDiscoveryService implements DiscoveryServiceFactory
{
private final Map<MemberId, CoreAddresses> coreMembers = new HashMap<>( );
private final Map<MemberId,CoreAddresses> coreMembers = new HashMap<>();
private final Set<EdgeAddresses> edgeAddresses = new HashSet<>();
private final List<SharedDiscoveryCoreClient> coreClients = new ArrayList<>();

Expand All @@ -61,7 +62,9 @@ public CoreTopologyService coreTopologyService( Config config, MemberId myself,
}

@Override
public TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate )
public TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress,
LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout,
long edgeRefreshRate )
{
return new SharedDiscoveryEdgeClient( this, boltAddress, logProvider );
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.logging.NullLogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down

0 comments on commit dc23ed1

Please sign in to comment.