diff --git a/community/bolt/pom.xml b/community/bolt/pom.xml index ddddbe93d349..1b7cc42cb0f7 100644 --- a/community/bolt/pom.xml +++ b/community/bolt/pom.xml @@ -104,10 +104,6 @@ org.junit.jupiter junit-jupiter-engine - - org.junit.jupiter - junit-jupiter-params - org.junit.jupiter junit-jupiter-params diff --git a/community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java b/community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java index 3fa7c9265b4c..a42dbe761f35 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/BoltServer.java @@ -124,12 +124,12 @@ public void start() throws Throwable createConnectionFactory( config, boltSchedulerProvider, throttleGroup, logService, clock ); BoltStateMachineFactory boltStateMachineFactory = createBoltFactory( authentication, clock ); - BoltProtocolFactory boltProtocolInstaller = createBoltProtocolInstallerFactory( boltConnectionFactory, boltStateMachineFactory ); + BoltProtocolFactory boltProtocolFactory = createBoltProtocolFactory( boltConnectionFactory, boltStateMachineFactory ); if ( !config.enabledBoltConnectors().isEmpty() && !config.get( GraphDatabaseSettings.disconnected ) ) { NettyServer server = new NettyServer( jobScheduler.threadFactory( boltNetworkIO ), - createConnectors( boltProtocolInstaller, throttleGroup, boltLogging, log ), connectorPortRegister, userLog ); + createConnectors( boltProtocolFactory, throttleGroup, boltLogging, log ), connectorPortRegister, userLog ); life.add( server ); log.info( "Bolt server loaded" ); } @@ -152,15 +152,15 @@ private BoltConnectionFactory createConnectionFactory( Config config, BoltSchedu config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), monitors ); } - private Map createConnectors( BoltProtocolFactory handlerFactory, + private Map createConnectors( BoltProtocolFactory boltProtocolFactory, TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log ) { return config.enabledBoltConnectors() .stream() - .collect( toMap( identity(), connector -> createProtocolInitializer( connector, handlerFactory, throttleGroup, boltLogging, log ) ) ); + .collect( toMap( identity(), connector -> createProtocolInitializer( connector, boltProtocolFactory, throttleGroup, boltLogging, log ) ) ); } - private ProtocolInitializer createProtocolInitializer( BoltConnector connector, BoltProtocolFactory handlerFactory, + private ProtocolInitializer createProtocolInitializer( BoltConnector connector, BoltProtocolFactory boltProtocolFactory, TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log ) { SslContext sslCtx; @@ -198,7 +198,7 @@ private ProtocolInitializer createProtocolInitializer( BoltConnector connector, ListenSocketAddress listenAddress = config.get( connector.listen_address ); return new SocketTransport( connector.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging, - throttleGroup, handlerFactory ); + throttleGroup, boltProtocolFactory ); } private static SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config ) @@ -225,7 +225,7 @@ private Authentication createAuthentication() dependencyResolver.resolveDependency( UserManagerSupplier.class ) ); } - private BoltProtocolFactory createBoltProtocolInstallerFactory( BoltConnectionFactory connectionFactory, + private BoltProtocolFactory createBoltProtocolFactory( BoltConnectionFactory connectionFactory, BoltStateMachineFactory stateMachineFactory ) { return new DefaultBoltProtocolFactory( connectionFactory, stateMachineFactory, logService ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/messaging/Neo4jPack.java b/community/bolt/src/main/java/org/neo4j/bolt/messaging/Neo4jPack.java index 87fba03dc457..b85f314e8dec 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/messaging/Neo4jPack.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/messaging/Neo4jPack.java @@ -21,8 +21,6 @@ import java.io.IOException; -import org.neo4j.bolt.v1.packstream.PackInput; -import org.neo4j.bolt.v1.packstream.PackOutput; import org.neo4j.values.AnyValue; import org.neo4j.values.virtual.MapValue; @@ -30,7 +28,7 @@ * Represents a single Bolt message format by exposing a {@link Packer packer} and {@link Unpacker unpacker} * for primitives of this format. */ -public interface Neo4jPack +public interface Neo4jPack extends PackProvider, UnpackerProvider { interface Packer { @@ -61,10 +59,5 @@ interface Unpacker long unpackListHeader() throws IOException; } - - Packer newPacker( PackOutput output ); - - Unpacker newUnpacker( PackInput input ); - long version(); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/messaging/PackProvider.java b/community/bolt/src/main/java/org/neo4j/bolt/messaging/PackProvider.java new file mode 100644 index 000000000000..2b6f3134f734 --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/messaging/PackProvider.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.messaging; + +import org.neo4j.bolt.v1.packstream.PackOutput; + +public interface PackProvider +{ + Neo4jPack.Packer newPacker( PackOutput output ); +} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/messaging/UnpackerProvider.java b/community/bolt/src/main/java/org/neo4j/bolt/messaging/UnpackerProvider.java new file mode 100644 index 000000000000..7b10c8eb8357 --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/messaging/UnpackerProvider.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.messaging; + +import org.neo4j.bolt.v1.packstream.PackInput; + +public interface UnpackerProvider +{ + Neo4jPack.Unpacker newUnpacker( PackInput input ); +} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionFactory.java index afc9408016a8..7c87a5716e62 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/BoltConnectionFactory.java @@ -27,6 +27,7 @@ public interface BoltConnectionFactory * Create a new connection bound to the specified channel * * @param channel the underlying channel + * @param boltStateMachine to handle state change of the connection * @return the newly created connection instance */ BoltConnection newConnection( BoltChannel channel, BoltStateMachine boltStateMachine ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/BoltProtocolFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/BoltProtocolFactory.java index 2cce7ece6ea9..eb41b787437b 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/BoltProtocolFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/BoltProtocolFactory.java @@ -25,7 +25,7 @@ /** * Represents a component that instantiates Bolt protocol handlers. * - * @see BoltProtocolPipelineInstaller + * @see BoltProtocol */ @FunctionalInterface public interface BoltProtocolFactory diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/SocketTransport.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/SocketTransport.java index 73b636d4d132..282441bc09ee 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/SocketTransport.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/SocketTransport.java @@ -40,12 +40,12 @@ public class SocketTransport implements NettyServer.ProtocolInitializer private final LogProvider logging; private final BoltMessageLogging boltLogging; private final TransportThrottleGroup throttleGroup; - private final BoltProtocolFactory handlerFactory; + private final BoltProtocolFactory boltProtocolFactory; public SocketTransport( String connector, ListenSocketAddress address, SslContext sslCtx, boolean encryptionRequired, LogProvider logging, BoltMessageLogging boltLogging, TransportThrottleGroup throttleGroup, - BoltProtocolFactory handlerFactory ) + BoltProtocolFactory boltProtocolFactory ) { this.connector = connector; this.address = address; @@ -54,7 +54,7 @@ public SocketTransport( String connector, ListenSocketAddress address, SslContex this.logging = logging; this.boltLogging = boltLogging; this.throttleGroup = throttleGroup; - this.handlerFactory = handlerFactory; + this.boltProtocolFactory = boltProtocolFactory; } @Override @@ -74,7 +74,7 @@ public void initChannel( SocketChannel ch ) ch.closeFuture().addListener( future -> throttleGroup.uninstall( ch ) ); TransportSelectionHandler transportSelectionHandler = new TransportSelectionHandler( connector, sslCtx, - encryptionRequired, false, logging, handlerFactory, boltLogging ); + encryptionRequired, false, logging, boltProtocolFactory, boltLogging ); ch.pipeline().addLast( transportSelectionHandler ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportSelectionHandler.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportSelectionHandler.java index 466dafd6e0ce..e012670a83c5 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportSelectionHandler.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/TransportSelectionHandler.java @@ -52,10 +52,10 @@ public class TransportSelectionHandler extends ByteToMessageDecoder private final boolean isEncrypted; private final LogProvider logging; private final BoltMessageLogging boltLogging; - private final BoltProtocolFactory handlerFactory; + private final BoltProtocolFactory boltProtocolFactory; TransportSelectionHandler( String connector, SslContext sslCtx, boolean encryptionRequired, boolean isEncrypted, LogProvider logging, - BoltProtocolFactory handlerFactory, BoltMessageLogging boltLogging ) + BoltProtocolFactory boltProtocolFactory, BoltMessageLogging boltLogging ) { this.connector = connector; this.sslCtx = sslCtx; @@ -63,7 +63,7 @@ public class TransportSelectionHandler extends ByteToMessageDecoder this.isEncrypted = isEncrypted; this.logging = logging; this.boltLogging = boltLogging; - this.handlerFactory = handlerFactory; + this.boltProtocolFactory = boltProtocolFactory; } @Override @@ -121,7 +121,7 @@ private void enableSsl( ChannelHandlerContext ctx ) { ChannelPipeline p = ctx.pipeline(); p.addLast( sslCtx.newHandler( ctx.alloc() ) ); - p.addLast( new TransportSelectionHandler( connector, null, encryptionRequired, true, logging, handlerFactory, boltLogging ) ); + p.addLast( new TransportSelectionHandler( connector, null, encryptionRequired, true, logging, boltProtocolFactory, boltLogging ) ); p.remove( this ); } @@ -147,7 +147,7 @@ private void switchToWebsocket( ChannelHandlerContext ctx ) private ProtocolHandshaker newHandshaker( ChannelHandlerContext ctx ) { - return new ProtocolHandshaker( handlerFactory, BoltChannel.open( connector, ctx.channel(), boltLogging.newLogger( ctx.channel() ) ), logging, + return new ProtocolHandshaker( boltProtocolFactory, BoltChannel.open( connector, ctx.channel(), boltLogging.newLogger( ctx.channel() ) ), logging, encryptionRequired, isEncrypted ); } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/pipeline/MessageDecoder.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/pipeline/MessageDecoder.java index 4d127ef3700c..04b39c8a011a 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/pipeline/MessageDecoder.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/pipeline/MessageDecoder.java @@ -23,12 +23,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import java.util.function.Function; - import org.neo4j.bolt.messaging.BoltRequestMessageReader; import org.neo4j.bolt.messaging.Neo4jPack; +import org.neo4j.bolt.messaging.UnpackerProvider; import org.neo4j.bolt.v1.packstream.ByteBufInput; -import org.neo4j.bolt.v1.packstream.PackInput; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.logging.Log; @@ -41,10 +39,10 @@ public class MessageDecoder extends SimpleChannelInboundHandler private final BoltRequestMessageReader reader; private final Log log; - public MessageDecoder( Function unpackProvider, BoltRequestMessageReader reader, LogService logService ) + public MessageDecoder( UnpackerProvider unpackProvider, BoltRequestMessageReader reader, LogService logService ) { this.input = new ByteBufInput(); - this.unpacker = unpackProvider.apply( input ); + this.unpacker = unpackProvider.newUnpacker( input ); this.reader = reader; this.log = logService.getInternalLog( getClass() ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/BoltProtocolV1.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/BoltProtocolV1.java index 3ebfda6eac03..cede3eff3531 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/BoltProtocolV1.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/BoltProtocolV1.java @@ -74,7 +74,7 @@ public void install() pipeline.addLast( new ChunkDecoder() ); pipeline.addLast( new MessageAccumulator() ); - pipeline.addLast( new MessageDecoder( neo4jPack::newUnpacker, messageReader, logging ) ); + pipeline.addLast( new MessageDecoder( neo4jPack, messageReader, logging ) ); pipeline.addLast( new HouseKeeper( connection, logging ) ); } @@ -91,7 +91,7 @@ public long version() public static BoltRequestMessageReader createBoltMessageReaderV1( BoltChannel channel, Neo4jPack neo4jPack, BoltConnection connection, LogService logging ) { - BoltResponseMessageWriter responseWriter = new BoltResponseMessageWriter( neo4jPack::newPacker, connection.output(), logging, channel.log() ); + BoltResponseMessageWriter responseWriter = new BoltResponseMessageWriter( neo4jPack, connection.output(), logging, channel.log() ); return new BoltRequestMessageReaderV1( connection, responseWriter, channel.log(), logging ); } } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java index a8ca6a2ed4ca..c1a7e4a6b0d7 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java @@ -20,10 +20,10 @@ package org.neo4j.bolt.v1.messaging; import java.io.IOException; -import java.util.function.Function; import org.neo4j.bolt.logging.BoltMessageLogger; import org.neo4j.bolt.messaging.Neo4jPack; +import org.neo4j.bolt.messaging.PackProvider; import org.neo4j.bolt.v1.packstream.PackOutput; import org.neo4j.cypher.result.QueryResult; import org.neo4j.function.ThrowingAction; @@ -48,11 +48,11 @@ public class BoltResponseMessageWriter implements BoltResponseMessageHandler private final BoltMessageLogger messageLogger; private final Log log; - public BoltResponseMessageWriter( Function packerProvider, PackOutput output, LogService logService, + public BoltResponseMessageWriter( PackProvider packerProvider, PackOutput output, LogService logService, BoltMessageLogger messageLogger ) { this.output = output; - this.packer = packerProvider.apply( output ); + this.packer = packerProvider.newPacker( output ); this.messageLogger = messageLogger; this.log = logService.getInternalLog( getClass() ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactory.java index 32288a97e97d..ab149071b1e9 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactory.java @@ -30,6 +30,7 @@ public interface BoltStateMachineFactory /** * Generate a new state machine. * + * @param protocolVersion used to select state machine version * @param boltChannel channel over which Bolt massages can be exchanged * @return new {@link BoltStateMachine} instance */ diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactoryImplTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactoryImplTest.java index bd7a26c5b166..bda26354eda4 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactoryImplTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineFactoryImplTest.java @@ -27,6 +27,8 @@ import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.runtime.BoltStateMachine; import org.neo4j.bolt.security.auth.Authentication; +import org.neo4j.bolt.v1.BoltProtocolV1; +import org.neo4j.bolt.v2.BoltProtocolV2; import org.neo4j.graphdb.DependencyResolver; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.GraphDatabaseQueryService; @@ -47,8 +49,8 @@ public class BoltStateMachineFactoryImplTest private static final Clock CLOCK = Clock.systemUTC(); private static final BoltChannel CHANNEL = mock( BoltChannel.class ); - @ParameterizedTest - @ValueSource( longs = {1, 2} ) + @ParameterizedTest( name = "V{0}" ) + @ValueSource( longs = {BoltProtocolV1.VERSION, BoltProtocolV2.VERSION} ) public void shouldCreateBoltStateMachines( long protocolVersion ) { BoltStateMachineFactoryImpl factory = newBoltFactory(); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java index 399b168cd564..ae9c03c0ceb1 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java @@ -20,7 +20,6 @@ package org.neo4j.bolt.v1.transport.socket; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelPipeline; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.After; import org.junit.Test; @@ -34,15 +33,12 @@ import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.BoltProtocol; import org.neo4j.bolt.logging.NullBoltMessageLogger; -import org.neo4j.bolt.messaging.BoltRequestMessageReader; import org.neo4j.bolt.messaging.Neo4jPack; import org.neo4j.bolt.messaging.RequestMessage; import org.neo4j.bolt.runtime.BoltResponseHandler; import org.neo4j.bolt.runtime.BoltStateMachine; import org.neo4j.bolt.runtime.SynchronousBoltConnection; -import org.neo4j.bolt.transport.pipeline.ChunkDecoder; -import org.neo4j.bolt.transport.pipeline.MessageAccumulator; -import org.neo4j.bolt.transport.pipeline.MessageDecoder; +import org.neo4j.bolt.v1.BoltProtocolV1; import org.neo4j.bolt.v1.messaging.BoltRequestMessageWriter; import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.messaging.RecordingByteChannel; @@ -60,7 +56,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.neo4j.bolt.v1.BoltProtocolV1.createBoltMessageReaderV1; import static org.neo4j.values.virtual.VirtualValues.EMPTY_MAP; /** @@ -144,27 +139,7 @@ private void testPermutation( byte[] unfragmented, ByteBuf[] fragments ) throws BoltStateMachine machine = mock( BoltStateMachine.class ); SynchronousBoltConnection boltConnection = new SynchronousBoltConnection( machine ); NullLogService logging = NullLogService.getInstance(); - BoltRequestMessageReader messageReader = createBoltMessageReaderV1( boltChannel, neo4jPack, boltConnection, logging ); - BoltProtocol boltProtocol = new BoltProtocol() - { - @Override - public void install() - { - ChannelPipeline pipeline = boltChannel.rawChannel().pipeline(); - - pipeline.addLast( new ChunkDecoder() ); - pipeline.addLast( new MessageAccumulator() ); - - pipeline.addLast( new MessageDecoder( neo4jPack::newUnpacker, messageReader, logging ) ); - } - - @Override - public long version() - { - return -1; - } - }; - + BoltProtocol boltProtocol = new BoltProtocolV1( boltChannel, ( ch, s ) -> boltConnection, ( v, ch ) -> machine, logging ); boltProtocol.install(); // When data arrives split up according to the current permutation