New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify BOLT into a single port #6020

Merged
merged 1 commit into from Dec 16, 2015
Jump to file or symbol
Failed to load files and symbols.
+1,038 −359
Diff settings

Always

Just for now

@@ -24,13 +24,16 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.neo4j.bolt.transport.BoltProtocol;
import org.neo4j.bolt.transport.NettyServer;
import org.neo4j.bolt.transport.SocketTransport;
import org.neo4j.bolt.transport.WebSocketTransport;
import org.neo4j.bolt.v1.runtime.Sessions;
import org.neo4j.bolt.v1.runtime.internal.EncryptionRequiredSessions;
import org.neo4j.bolt.v1.runtime.internal.StandardSessions;
import org.neo4j.bolt.v1.runtime.internal.concurrent.ThreadedSessions;
import org.neo4j.bolt.v1.transport.BoltProtocolV1;
@@ -42,6 +45,8 @@
import org.neo4j.helpers.Service;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.ConfigValues;
import org.neo4j.kernel.configuration.ConfigView;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.JobScheduler;
@@ -50,10 +55,11 @@
import org.neo4j.logging.Log;
import org.neo4j.udc.UsageData;
import static java.util.Arrays.asList;
import static org.neo4j.bolt.BoltKernelExtension.EncryptionLevel.OPTIONAL;
import static org.neo4j.collection.primitive.Primitive.longObjectMap;
import static org.neo4j.helpers.Settings.BOOLEAN;
import static org.neo4j.helpers.Settings.HOSTNAME_PORT;
import static org.neo4j.helpers.Settings.options;
import static org.neo4j.helpers.Settings.setting;
import static org.neo4j.kernel.impl.util.JobScheduler.Groups.boltNetworkIO;
@@ -65,21 +71,67 @@
{
public static class Settings
{
public static final Function<ConfigValues,List<ConfigView>> connector_group = Config.groups( "dbms.connector" );
@Description( "Enable Neo4j Bolt" )
public static final Setting<Boolean> enabled =
setting( "dbms.bolt.enabled", BOOLEAN, "false" );
setting( "enabled", BOOLEAN, "false" );
@Description( "Enable Neo4j encryption for Bolt protocol ports" )
public static final Setting<Boolean> tls_enabled =
setting( "dbms.bolt.tls.enabled", BOOLEAN, "false" );
@Description( "Set the encryption level for Neo4j Bolt protocol ports" )
public static final Setting<EncryptionLevel> tls_level =
setting( "tls.level", options( EncryptionLevel.class ),
OPTIONAL.name() );
@Description( "Host and port for the Neo4j Bolt Protocol" )
public static final Setting<HostnamePort> socket_address =
setting( "dbms.bolt.socket.address", HOSTNAME_PORT, "localhost:7687" );
setting( "address", HOSTNAME_PORT, "localhost:7687" );
public static <T> Setting<T> connector( int i, Setting<T> setting )
{
String name = String.format( "dbms.connector.%s", i );
return new Setting<T>()
{
@Override
public String name()
{
return String.format( "%s.%s", name, setting.name() );
}
@Override
public String getDefaultValue()
{
return setting.getDefaultValue();
}
@Override
public T apply( Function<String,String> settings )
{
return setting.apply( settings );
}
@Override
public int hashCode()
{
return name().hashCode();
}
@Override
public boolean equals( Object obj )
{
if ( obj instanceof Setting<?> && ((Setting<?>) obj).name().equals( name() ) )
{
return true;
}
return false;
}
};
}
}
@Description( "Host and port for the Neo4j Bolt Protocol Websocket" )
public static final Setting<HostnamePort> websocket_address =
setting( "dbms.bolt.websocket.address", HOSTNAME_PORT, "localhost:7688" );
public enum EncryptionLevel
{
REQUIRED, OPTIONAL, DISABLED
}
public interface Dependencies
@@ -109,38 +161,76 @@ public Lifecycle newKernelExtension( final Dependencies dependencies ) throws Th
final LogService logging = dependencies.logService();
final Log log = logging.getInternalLog( Sessions.class );
final HostnamePort socketAddress = config.get( Settings.socket_address );
final HostnamePort webSocketAddress = config.get( Settings.websocket_address );
final LifeSupport life = new LifeSupport();
if ( config.get( Settings.enabled ) )
{
final JobScheduler scheduler = dependencies.scheduler();
final JobScheduler scheduler = dependencies.scheduler();
final Sessions sessions = new ThreadedSessions(
life.add( new StandardSessions( api, dependencies.usageData(), logging ) ),
scheduler, logging );
final Sessions sessions = new ThreadedSessions(
life.add( new StandardSessions( api, dependencies.usageData(), logging ) ),
scheduler, logging );
SslContext sslCtx = null;
if ( config.get( Settings.tls_enabled ) )
{
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer( ssc.certificate(), ssc.privateKey() ).build();
}
PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> availableVersions = longObjectMap();
List<NettyServer.ProtocolInitializer> connectors = new ArrayList<>();
List<ConfigView> view = config.view( Settings.connector_group );
for( ConfigView connector: view )
{
final HostnamePort socketAddress = connector.get( Settings.socket_address );
availableVersions.put( BoltProtocolV1.VERSION, channel -> new BoltProtocolV1( logging, sessions.newSession(), channel ) );
if ( connector.get( Settings.enabled ) )
{
SslContext sslCtx;
boolean requireEncryption = false;
switch ( connector.get( Settings.tls_level ) )
{
// self signed cert should be generated when encryption is REQUIRED or OPTIONAL on the server
// while no cert is generated if encryption is DISABLED
case REQUIRED:
requireEncryption = true;
// no break here
case OPTIONAL:
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer( ssc.certificate(), ssc.privateKey() ).build();
break;
default:
// case DISABLED:
sslCtx = null;
break;
}
PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions;
if( requireEncryption )
{
availableVersions = newVersions( logging, new EncryptionRequiredSessions( sessions ) );
}
else
{
availableVersions = newVersions( logging, sessions );
}
connectors.add( new SocketTransport( socketAddress, sslCtx, logging.getInternalLogProvider(),
availableVersions ) );
}
}
// Start services
life.add( new NettyServer( scheduler.threadFactory( boltNetworkIO ), asList(
new SocketTransport( socketAddress, sslCtx, logging.getInternalLogProvider(), availableVersions ),
new WebSocketTransport( webSocketAddress, sslCtx, logging.getInternalLogProvider(), availableVersions ) ) ) );
if( connectors.size() > 0 )
{
life.add( new NettyServer( scheduler.threadFactory( boltNetworkIO ), connectors ) );
log.info( "Bolt Server extension loaded." );
}
return life;
}
private PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> newVersions( LogService logging,
Sessions sessions )
{
PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions = longObjectMap();
availableVersions.put(
BoltProtocolV1.VERSION,
( channel, isEncrypted ) -> new BoltProtocolV1( logging, sessions.newSession( isEncrypted ), channel )
);
return availableVersions;
}
}
@@ -25,8 +25,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import java.util.function.Function;
import java.util.function.BiFunction;
import org.neo4j.collection.primitive.PrimitiveLongObjectMap;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.logging.LogProvider;
@@ -39,10 +38,10 @@
private final HostnamePort address;
private final SslContext sslCtx;
private LogProvider logging;
private final PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> protocolVersions;
private final PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> protocolVersions;
public SocketTransport( HostnamePort address, SslContext sslCtx, LogProvider logging,
PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> protocolVersions)
PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> protocolVersions )
{
this.address = address;
this.sslCtx = sslCtx;
@@ -59,16 +58,9 @@ public SocketTransport( HostnamePort address, SslContext sslCtx, LogProvider log
public void initChannel( SocketChannel ch ) throws Exception
{
ch.config().setAllocator( PooledByteBufAllocator.DEFAULT );
if( sslCtx != null )
{
ch.pipeline().addLast( sslCtx.newHandler( ch.alloc() ) );
}
ch.pipeline().addLast( new SocketTransportHandler(
new SocketTransportHandler.ProtocolChooser( protocolVersions ), logging ) );
ch.pipeline().addLast( new TransportSelectionHandler( sslCtx, logging, protocolVersions ) );
}
} ;
};
}
@Override
@@ -26,7 +26,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.Function;
import java.util.function.BiFunction;
import org.neo4j.collection.primitive.PrimitiveLongObjectMap;
import org.neo4j.logging.Log;
@@ -159,17 +159,20 @@ private void chooseProtocolVersion( ChannelHandlerContext ctx, ByteBuf buffer )
*/
public static class ProtocolChooser
{
private final PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> availableVersions;
private final PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions;
private final boolean isEncrypted;
private final ByteBuffer handShake = ByteBuffer.allocateDirect( 5 * 4 ).order( ByteOrder.BIG_ENDIAN );
private static final int MAGIC_PREAMBLE = 0x6060B017;
public static final int BOLT_MAGIC_PREAMBLE = 0x6060B017;
private BoltProtocol protocol;
/**
* @param availableVersions version -> protocol mapping
*/
public ProtocolChooser( PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> availableVersions )
public ProtocolChooser( PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions, boolean isEncrypted )
{
this.availableVersions = availableVersions;
this.isEncrypted = isEncrypted;
}
public HandshakeOutcome handleVersionHandshakeChunk( ByteBuf buffer, Channel ch )
@@ -189,7 +192,7 @@ public HandshakeOutcome handleVersionHandshakeChunk( ByteBuf buffer, Channel ch
{
handShake.flip();
//Check so that handshake starts with 0x606 0B017
if ( handShake.getInt() != MAGIC_PREAMBLE )
if ( handShake.getInt() != BOLT_MAGIC_PREAMBLE )
{
return HandshakeOutcome.INVALID_HANDSHAKE;
}
@@ -199,7 +202,7 @@ public HandshakeOutcome handleVersionHandshakeChunk( ByteBuf buffer, Channel ch
long suggestion = handShake.getInt() & 0xFFFFFFFFL;
if ( availableVersions.containsKey( suggestion ) )
{
protocol = availableVersions.get( suggestion ).apply( ch );
protocol = availableVersions.get( suggestion ).apply( ch, isEncrypted );
return HandshakeOutcome.PROTOCOL_CHOSEN;
}
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.