Skip to content

Commit

Permalink
Merge pull request #9554 from MishaDemianenko/3.3-store-local-bolt
Browse files Browse the repository at this point in the history
Dynamic binding of bolt connector to available ports
  • Loading branch information
MishaDemianenko committed Jun 26, 2017
2 parents ba52eb4 + 7d0ebef commit 5726570
Show file tree
Hide file tree
Showing 25 changed files with 313 additions and 270 deletions.
11 changes: 0 additions & 11 deletions community/bolt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@
</license>
</licenses>

<build>
<plugins>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.neo4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.security.auth.BasicAuthentication;
Expand All @@ -51,6 +52,7 @@
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.bolt.BoltPortRegister;
import org.neo4j.kernel.api.security.AuthManager;
import org.neo4j.kernel.api.security.UserManagerSupplier;
import org.neo4j.kernel.configuration.BoltConnector;
Expand All @@ -60,20 +62,19 @@
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.udc.UsageData;

import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.neo4j.scheduler.JobScheduler.Groups.boltNetworkIO;
import static org.neo4j.kernel.configuration.Settings.STRING;
import static org.neo4j.kernel.configuration.Settings.setting;
import static org.neo4j.kernel.configuration.ssl.LegacySslPolicyConfig.LEGACY_POLICY_NAME;
import static org.neo4j.scheduler.JobScheduler.Groups.boltNetworkIO;

/**
* Wraps Bolt and exposes it as a Kernel Extension.
Expand Down Expand Up @@ -105,6 +106,8 @@ public interface Dependencies

BoltConnectionTracker sessionTracker();

BoltPortRegister connectionRegister();

Clock clock();

AuthManager authManager();
Expand Down Expand Up @@ -141,9 +144,10 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
BoltFactory boltFactory = life.add( new BoltFactoryImpl( api, dependencies.usageData(),
logService, dependencies.txBridge(), authentication, dependencies.sessionTracker(), config ) );
WorkerFactory workerFactory = createWorkerFactory( boltFactory, scheduler, dependencies, logService, clock );
BoltPortRegister connectionRegister = dependencies.connectionRegister();

List<ProtocolInitializer> connectors = config.enabledBoltConnectors().stream()
.map( ( connConfig ) ->
Map<BoltConnector, ProtocolInitializer> connectors = config.enabledBoltConnectors().stream()
.collect( Collectors.toMap( Function.identity(), connConfig ->
{
ListenSocketAddress listenAddress = config.get( connConfig.listen_address );
SslContext sslCtx;
Expand Down Expand Up @@ -180,14 +184,13 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
final Map<Long, BiFunction<Channel, Boolean, BoltProtocol>> versions =
newVersions( logService, workerFactory );
return new SocketTransport( listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), versions );
} )
.collect( toList() );
} ) );

if ( connectors.size() > 0 && !config.get( GraphDatabaseSettings.disconnected ) )
{
life.add( new NettyServer( scheduler.threadFactory( boltNetworkIO ), connectors ) );
life.add( new NettyServer( scheduler.threadFactory( boltNetworkIO ), connectors, connectionRegister ) );
log.info( "Bolt Server extension loaded." );
for ( ProtocolInitializer connector : connectors )
for ( ProtocolInitializer connector : connectors.values() )
{
logService.getUserLog( WorkerFactory.class ).info( "Bolt enabled on %s.", connector.address() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.BindException;
import java.util.Collection;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ThreadFactory;

import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.PortBindException;
import org.neo4j.kernel.api.bolt.BoltPortRegister;
import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

/**
Expand All @@ -46,8 +50,9 @@ public class NettyServer extends LifecycleAdapter
private static final int NUM_SELECTOR_THREADS = Math.max( 1, Integer.getInteger(
"org.neo4j.selectorThreads", Runtime.getRuntime().availableProcessors() * 2 ) );

private final Collection<ProtocolInitializer> bootstrappers;
private final Map<BoltConnector, ProtocolInitializer> bootstrappersMap;
private final ThreadFactory tf;
private final BoltPortRegister connectionRegister;
private EventLoopGroup bossGroup;
private EventLoopGroup selectorGroup;

Expand All @@ -62,12 +67,15 @@ public interface ProtocolInitializer

/**
* @param tf used to create IO threads to listen and handle network events
* @param initializers functions that bootstrap protocols we should support
* @param initializersMap function per bolt connector map to bootstrap configured protocols
* @param connectorRegister register to keep local address information on all configured connectors
*/
public NettyServer( ThreadFactory tf, Collection<ProtocolInitializer> initializers )
public NettyServer( ThreadFactory tf, Map<BoltConnector, ProtocolInitializer> initializersMap,
BoltPortRegister connectorRegister )
{
this.bootstrappers = initializers;
this.bootstrappersMap = initializersMap;
this.tf = tf;
this.connectionRegister = connectorRegister;
}

@Override
Expand All @@ -87,17 +95,19 @@ public void start() throws Throwable

// Bootstrap the various ports and protocols we want to handle

for ( ProtocolInitializer initializer : bootstrappers )
for ( Map.Entry<BoltConnector, ProtocolInitializer> bootstrapEntry : bootstrappersMap.entrySet() )
{
try
{
new ServerBootstrap()
.option( ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT )
.group( bossGroup, selectorGroup )
.channel( NioServerSocketChannel.class )
.childHandler( initializer.channelInitializer() )
.bind( initializer.address().socketAddress() )
.sync();
ProtocolInitializer protocolInitializer = bootstrapEntry.getValue();
BoltConnector boltConnector = bootstrapEntry.getKey();
ChannelFuture channelFuture =
new ServerBootstrap().option( ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT )
.group( bossGroup, selectorGroup ).channel( NioServerSocketChannel.class )
.childHandler( protocolInitializer.channelInitializer() )
.bind( protocolInitializer.address().socketAddress() ).sync();
InetSocketAddress localAddress = (InetSocketAddress) channelFuture.channel().localAddress();
connectionRegister.register( boltConnector.key(), localAddress );
}
catch ( Throwable e )
{
Expand All @@ -107,7 +117,7 @@ public void start() throws Throwable

if ( e instanceof BindException )
{
throw new PortBindException( initializer.address(), (BindException) e );
throw new PortBindException( bootstrapEntry.getValue().address(), (BindException) e );
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,26 @@

import static java.util.Collections.emptyMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket.DEFAULT_CONNECTOR_KEY;
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyDisconnects;
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives;
import static org.neo4j.kernel.configuration.BoltConnector.EncryptionLevel.REQUIRED;

public class BoltConfigIT
{
private static final String ANOTHER_CONNECTOR_KEY = "1";

@Rule
public Neo4jWithSocket server = new Neo4jWithSocket( getClass(),
settings ->
{
settings.put( new BoltConnector( "bolt" ).type.name(), "BOLT" );
settings.put( new BoltConnector( "bolt" ).enabled.name(), "true" );
settings.put( new BoltConnector( "bolt" ).address.name(), "localhost:7888" );
settings.put( new BoltConnector( "1" ).type.name(), "BOLT" );
settings.put( new BoltConnector( "1" ).enabled.name(), "true" );
settings.put( new BoltConnector( "1" ).address.name(), "localhost:7687" );
settings.put( new BoltConnector( "1" ).encryption_level.name(), REQUIRED.name() );
settings.put( new BoltConnector( DEFAULT_CONNECTOR_KEY ).type.name(), "BOLT" );
settings.put( new BoltConnector( DEFAULT_CONNECTOR_KEY ).enabled.name(), "true" );
settings.put( new BoltConnector( DEFAULT_CONNECTOR_KEY ).address.name(), "localhost:0" );
settings.put( new BoltConnector( ANOTHER_CONNECTOR_KEY ).type.name(), "BOLT" );
settings.put( new BoltConnector( ANOTHER_CONNECTOR_KEY ).enabled.name(), "true" );
settings.put( new BoltConnector( ANOTHER_CONNECTOR_KEY ).address.name(), "localhost:0" );
settings.put( new BoltConnector( ANOTHER_CONNECTOR_KEY ).encryption_level.name(), REQUIRED.name() );
} );
@Rule
public SuppressOutput suppressOutput = SuppressOutput.suppressAll();
Expand All @@ -64,13 +67,13 @@ public void shouldSupportMultipleConnectors() throws Throwable
// When

// Then
HostnamePort address0 = new HostnamePort( "localhost:7888" );
HostnamePort address0 = server.lookupConnector( DEFAULT_CONNECTOR_KEY );
assertConnectionAccepted( address0, new WebSocketConnection() );
assertConnectionAccepted( address0, new SecureWebSocketConnection() );
assertConnectionAccepted( address0, new SocketConnection() );
assertConnectionAccepted( address0, new SecureSocketConnection() );

HostnamePort address1 = new HostnamePort( "localhost:7687" );
HostnamePort address1 = server.lookupConnector( ANOTHER_CONNECTOR_KEY );
assertConnectionRejected( address1, new WebSocketConnection() );
assertConnectionAccepted( address1, new SecureWebSocketConnection() );
assertConnectionRejected( address1, new SocketConnection() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@

import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.Map;

import org.neo4j.bolt.transport.NettyServer;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.PortBindException;
import org.neo4j.kernel.api.bolt.BoltPortRegister;
import org.neo4j.kernel.configuration.BoltConnector;

import static java.util.Arrays.asList;
import static org.neo4j.helpers.collection.MapUtil.genericMap;

public class NettyServerTest
{
Expand All @@ -54,7 +57,11 @@ public void shouldGivePortConflictErrorWithPortNumberInIt() throws Throwable
exception.expectMessage( "Address localhost:16000 is already in use" );

// When
new NettyServer( new NamedThreadFactory( "mythreads" ), asList( protocolOnAddress( address ) ) ).start();
Map<BoltConnector,NettyServer.ProtocolInitializer> initializersMap =
genericMap( new BoltConnector( "test" ), protocolOnAddress( address ) );
new NettyServer( new NamedThreadFactory( "mythreads" ), initializersMap,
new BoltPortRegister() ).start();

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@
public class AuthenticationIT
{
protected EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
protected Neo4jWithSocket server = new Neo4jWithSocket( getClass(), getTestGraphDatabaseFactory(),
fsRule::get, getSettingsFunction() );
protected Neo4jWithSocket server = new Neo4jWithSocket( getClass(), getTestGraphDatabaseFactory(), fsRule, getSettingsFunction() );

@Rule
public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server );
Expand All @@ -89,35 +88,34 @@ protected Consumer<Map<String, String>> getSettingsFunction()
return settings -> settings.put( GraphDatabaseSettings.auth_enabled.name(), "true" );
}

@Parameterized.Parameter( 0 )
@Parameterized.Parameter
public Factory<TransportConnection> cf;

@Parameterized.Parameter( 1 )
public HostnamePort address;

protected TransportConnection client;
private HostnamePort address;
private TransportConnection client;
private final String version = "Neo4j/" + Version.getNeo4jVersion();

@Parameterized.Parameters
public static Collection<Object[]> transports()
public static Collection<Factory<TransportConnection>> transports()
{
return asList( SocketConnection::new, WebSocketConnection::new, SecureSocketConnection::new,
SecureWebSocketConnection::new );
}

@Before
public void setup() throws IOException
{
this.client = cf.newInstance();
this.address = server.lookupDefaultConnector();
}

@After
public void teardown() throws Exception
{
return asList(
new Object[]{
(Factory<TransportConnection>) SocketConnection::new,
new HostnamePort( "localhost:7687" )
},
new Object[]{
(Factory<TransportConnection>) WebSocketConnection::new,
new HostnamePort( "localhost:7687" )
},
new Object[]{
(Factory<TransportConnection>) SecureSocketConnection::new,
new HostnamePort( "localhost:7687" )
},
new Object[]{
(Factory<TransportConnection>) SecureWebSocketConnection::new,
new HostnamePort( "localhost:7687" )
} );
if ( client != null )
{
client.disconnect();
}
}

@Test
Expand Down Expand Up @@ -529,21 +527,6 @@ public void shouldNotBeAbleToReadWhenPasswordChangeRequired() throws Throwable
assertThat( client, eventuallyDisconnects() );
}

@Before
public void setup() throws IOException
{
this.client = cf.newInstance();
}

@After
public void teardown() throws Exception
{
if ( client != null )
{
client.disconnect();
}
}

private void reconnect() throws Exception
{
if ( client != null )
Expand Down

0 comments on commit 5726570

Please sign in to comment.