Skip to content

Commit

Permalink
Procedures to list and kill network connections
Browse files Browse the repository at this point in the history
Added three new procedures:
 * `dbms.listConnections()`
 * `dbms.killConnection('connection-X')`
 * `dbms.killConnections(['connection-X', 'connection-Y', 'connection-Z'])`

The allow listing and termination of HTTP, HTTPS and Bolt connections.
Listing exposes:
 * connection id
 * name of the connector
 * username, if authenticated
 * connect time
 * client and server addresses

These procedures should help users track system activity and
investigate connectivity problems.
  • Loading branch information
lutovich authored and zhenlineo committed Jul 24, 2018
1 parent 3992f02 commit e70eee9
Show file tree
Hide file tree
Showing 75 changed files with 1,969 additions and 570 deletions.
67 changes: 48 additions & 19 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltChannel.java
Expand Up @@ -24,25 +24,25 @@
import java.net.SocketAddress;

import org.neo4j.bolt.logging.BoltMessageLogger;
import org.neo4j.kernel.api.net.TrackedNetworkConnection;

/**
* A channel through which Bolt messaging can occur.
*/
public class BoltChannel implements AutoCloseable, BoltConnectionDescriptor
public class BoltChannel implements AutoCloseable, TrackedNetworkConnection, BoltConnectionDescriptor
{
private final String id;
private final long connectTime;
private final String connector;
private final Channel rawChannel;
private final BoltMessageLogger messageLogger;

public static BoltChannel open( String connector, Channel rawChannel,
BoltMessageLogger messageLogger )
{
return new BoltChannel( connector, rawChannel, messageLogger );
}
private volatile String user;

private BoltChannel( String connector, Channel rawChannel,
BoltMessageLogger messageLogger )
public BoltChannel( String id, String connector, Channel rawChannel, BoltMessageLogger messageLogger )
{
this.id = id;
this.connectTime = System.currentTimeMillis();
this.connector = connector;
this.rawChannel = rawChannel;
this.messageLogger = messageLogger;
Expand All @@ -60,20 +60,15 @@ public BoltMessageLogger log()
}

@Override
public void close()
public String id()
{
Channel rawChannel = rawChannel();
if ( rawChannel.isOpen() )
{
messageLogger.serverEvent( "CLOSE" );
rawChannel.close().syncUninterruptibly();
}
return id;
}

@Override
public String id()
public long connectTime()
{
return rawChannel().id().asLongText();
return connectTime;
}

@Override
Expand All @@ -82,16 +77,50 @@ public String connector()
return connector;
}

@Override
public SocketAddress serverAddress()
{
return rawChannel.localAddress();
}

@Override
public SocketAddress clientAddress()
{
return rawChannel.remoteAddress();
}

@Override
public SocketAddress serverAddress()
public String user()
{
return rawChannel.localAddress();
return user;
}

@Override
public void updateUser( String user )
{
this.user = user;
}

@Override
public void close()
{
Channel rawChannel = rawChannel();
if ( rawChannel.isOpen() )
{
messageLogger.serverEvent( "CLOSE" );
rawChannel.close().syncUninterruptibly();
}
}

@Override
public String toString()
{
return "BoltChannel{" +
"id='" + id + '\'' +
", connectTime=" + connectTime +
", connector='" + connector + '\'' +
", rawChannel=" + rawChannel +
", user='" + user + '\'' +
'}';
}
}
13 changes: 7 additions & 6 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java
Expand Up @@ -48,7 +48,7 @@
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.net.NetworkConnectionTracker;
import org.neo4j.kernel.api.security.AuthManager;
import org.neo4j.kernel.api.security.UserManagerSupplier;
import org.neo4j.kernel.configuration.BoltConnector;
Expand Down Expand Up @@ -76,6 +76,7 @@ public class BoltServer extends LifecycleAdapter
private final JobScheduler jobScheduler;
private final AvailabilityGuard availabilityGuard;
private final ConnectorPortRegister connectorPortRegister;
private final NetworkConnectionTracker connectionTracker;
private final UsageData usageData;
private final Config config;
private final Clock clock;
Expand All @@ -88,14 +89,15 @@ public class BoltServer extends LifecycleAdapter
private final LifeSupport life = new LifeSupport();

public BoltServer( GraphDatabaseAPI db, FileSystemAbstraction fs, JobScheduler jobScheduler, AvailabilityGuard availabilityGuard,
ConnectorPortRegister connectorPortRegister, UsageData usageData, Config config, Clock clock, Monitors monitors,
LogService logService, DependencyResolver dependencyResolver )
ConnectorPortRegister connectorPortRegister, NetworkConnectionTracker connectionTracker, UsageData usageData, Config config,
Clock clock, Monitors monitors, LogService logService, DependencyResolver dependencyResolver )
{
this.db = db;
this.fs = fs;
this.jobScheduler = jobScheduler;
this.availabilityGuard = availabilityGuard;
this.connectorPortRegister = connectorPortRegister;
this.connectionTracker = connectionTracker;
this.usageData = usageData;
this.config = config;
this.clock = clock;
Expand Down Expand Up @@ -198,7 +200,7 @@ private ProtocolInitializer createProtocolInitializer( BoltConnector connector,

ListenSocketAddress listenAddress = config.get( connector.listen_address );
return new SocketTransport( connector.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging,
throttleGroup, boltProtocolFactory );
throttleGroup, boltProtocolFactory, connectionTracker );
}

private static SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config )
Expand Down Expand Up @@ -233,7 +235,6 @@ private BoltProtocolFactory createBoltProtocolFactory( BoltConnectionFactory con

private BoltStateMachineFactory createBoltFactory( Authentication authentication, Clock clock )
{
BoltConnectionTracker connectionTracker = dependencyResolver.resolveDependency( BoltConnectionTracker.class );
return new BoltStateMachineFactoryImpl( db, usageData, availabilityGuard, authentication, connectionTracker, clock, config, logService );
return new BoltStateMachineFactoryImpl( db, usageData, availabilityGuard, authentication, clock, config, logService );
}
}
Expand Up @@ -41,6 +41,11 @@ private BoltMessageLogging( BoltMessageLog boltMessageLog )
this.boltMessageLog = boltMessageLog;
}

public static BoltMessageLogging noop()
{
return new BoltMessageLogging( null );
}

public static BoltMessageLogging create( FileSystemAbstraction fs, JobScheduler scheduler, Config config, Log log )
{
return new BoltMessageLogging( createBoltMessageLog( fs, scheduler, config, log ) );
Expand Down
Expand Up @@ -21,9 +21,8 @@

import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine;

public interface BoltStateMachine extends ManagedBoltStateMachine, AutoCloseable
public interface BoltStateMachine extends AutoCloseable
{
void process( RequestMessage message, BoltResponseHandler handler ) throws BoltConnectionFatality;

Expand All @@ -43,6 +42,8 @@ public interface BoltStateMachine extends ManagedBoltStateMachine, AutoCloseable

void handleExternalFailure( Neo4jError error, BoltResponseHandler handler ) throws BoltConnectionFatality;

void terminate();

boolean isClosed();

@Override
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.bolt.v3.BoltStateMachineV3;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
Expand All @@ -45,19 +44,17 @@ public class BoltStateMachineFactoryImpl implements BoltStateMachineFactory
private final AvailabilityGuard availabilityGuard;
private final LogService logging;
private final Authentication authentication;
private final BoltConnectionTracker connectionTracker;
private final Config config;
private final Clock clock;

public BoltStateMachineFactoryImpl( GraphDatabaseAPI db, UsageData usageData, AvailabilityGuard availabilityGuard,
Authentication authentication, BoltConnectionTracker connectionTracker, Clock clock, Config config, LogService logging )
Authentication authentication, Clock clock, Config config, LogService logging )
{
this.db = db;
this.usageData = usageData;
this.availabilityGuard = availabilityGuard;
this.logging = logging;
this.authentication = authentication;
this.connectionTracker = connectionTracker;
this.config = config;
this.clock = clock;
}
Expand All @@ -82,7 +79,7 @@ else if ( protocolVersion == BoltProtocolV3.VERSION )
private BoltStateMachine newStateMachineV1( BoltChannel boltChannel )
{
TransactionStateMachineSPI transactionSPI = createTxSpi( clock );
BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, connectionTracker, transactionSPI );
BoltStateMachineSPI boltSPI = new BoltStateMachineV1SPI( boltChannel, usageData, logging, authentication, transactionSPI );
return new BoltStateMachineV1( boltSPI, boltChannel, clock );
}

Expand Down
Expand Up @@ -29,12 +29,8 @@ public interface BoltStateMachineSPI
{
BoltConnectionDescriptor connectionDescriptor();

void register( BoltStateMachine machine, String owner );

TransactionStateMachineSPI transactionSpi();

void onTerminate( BoltStateMachine machine );

void reportError( Neo4jError err );

AuthenticationResult authenticate( Map<String,Object> authToken ) throws AuthenticationException;
Expand Down
Expand Up @@ -28,7 +28,6 @@
*/
public class MutableConnectionState implements BoltResponseHandler
{
private String owner;
private Neo4jError pendingError;
private boolean pendingIgnore;
private volatile boolean terminated;
Expand Down Expand Up @@ -104,16 +103,6 @@ public void onFinish()
}
}

public String getOwner()
{
return owner;
}

public void setOwner( String owner )
{
this.owner = owner;
}

public Neo4jError getPendingError()
{
return pendingError;
Expand Down
Expand Up @@ -23,7 +23,7 @@

public interface StateMachineContext
{
void registerMachine( String owner );
void authenticationCompleted( String user );

void handleFailure( Throwable cause, boolean fatal ) throws BoltConnectionFatality;

Expand Down
Expand Up @@ -21,6 +21,7 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class NettyServer extends LifecycleAdapter
*/
public interface ProtocolInitializer
{
ChannelInitializer<io.netty.channel.socket.SocketChannel> channelInitializer();
ChannelInitializer<Channel> channelInitializer();
ListenSocketAddress address();
}

Expand Down
Expand Up @@ -20,12 +20,14 @@
package org.neo4j.bolt.transport;

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.logging.BoltMessageLogging;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.api.net.NetworkConnectionTracker;
import org.neo4j.logging.LogProvider;

/**
Expand All @@ -41,11 +43,11 @@ public class SocketTransport implements NettyServer.ProtocolInitializer
private final BoltMessageLogging boltLogging;
private final TransportThrottleGroup throttleGroup;
private final BoltProtocolFactory boltProtocolFactory;
private final NetworkConnectionTracker connectionTracker;

public SocketTransport( String connector, ListenSocketAddress address, SslContext sslCtx, boolean encryptionRequired,
LogProvider logging, BoltMessageLogging boltLogging,
TransportThrottleGroup throttleGroup,
BoltProtocolFactory boltProtocolFactory )
LogProvider logging, BoltMessageLogging boltLogging, TransportThrottleGroup throttleGroup,
BoltProtocolFactory boltProtocolFactory, NetworkConnectionTracker connectionTracker )
{
this.connector = connector;
this.address = address;
Expand All @@ -55,26 +57,31 @@ public SocketTransport( String connector, ListenSocketAddress address, SslContex
this.boltLogging = boltLogging;
this.throttleGroup = throttleGroup;
this.boltProtocolFactory = boltProtocolFactory;
this.connectionTracker = connectionTracker;
}

@Override
public ChannelInitializer<SocketChannel> channelInitializer()
public ChannelInitializer<Channel> channelInitializer()
{
return new ChannelInitializer<SocketChannel>()
return new ChannelInitializer<Channel>()
{
@Override
public void initChannel( SocketChannel ch )
public void initChannel( Channel ch )
{
ch.config().setAllocator( PooledByteBufAllocator.DEFAULT );

BoltChannel boltChannel = newBoltChannel( ch );
connectionTracker.add( boltChannel );
ch.closeFuture().addListener( future -> connectionTracker.remove( boltChannel ) );

// install throttles
throttleGroup.install( ch );

// add a close listener that will uninstall throttles
ch.closeFuture().addListener( future -> throttleGroup.uninstall( ch ) );

TransportSelectionHandler transportSelectionHandler = new TransportSelectionHandler( connector, sslCtx,
encryptionRequired, false, logging, boltProtocolFactory, boltLogging );
TransportSelectionHandler transportSelectionHandler = new TransportSelectionHandler( boltChannel, sslCtx,
encryptionRequired, false, logging, boltProtocolFactory );

ch.pipeline().addLast( transportSelectionHandler );
}
Expand All @@ -86,4 +93,9 @@ public ListenSocketAddress address()
{
return address;
}

private BoltChannel newBoltChannel( Channel ch )
{
return new BoltChannel( connectionTracker.newConnectionId( connector ), connector, ch, boltLogging.newLogger( ch ) );
}
}

0 comments on commit e70eee9

Please sign in to comment.