From 43236b94e9e6df3c38086a9176889f5553125a3e Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 9 Feb 2018 11:47:04 +0100 Subject: [PATCH] Moved bolt version into Neo4jPack & better test parameterization `Neo4jPack` is currently the only versioned part of the bolt server stack. This commit moves versioning into it and makes all ITs in the bolt module use handshakes with parameterized versions. --- .../DefaultBoltProtocolHandlerFactory.java | 19 +++-- .../neo4j/bolt/v1/messaging/Neo4jPack.java | 2 + .../neo4j/bolt/v1/messaging/Neo4jPackV1.java | 14 +++ ... => BoltMessagingProtocolHandlerImpl.java} | 19 ++--- .../neo4j/bolt/v2/messaging/Neo4jPackV2.java | 8 ++ .../BoltMessagingProtocolV2Handler.java | 44 ---------- .../bolt/AbstractBoltTransportsTest.java | 2 +- ...DefaultBoltProtocolHandlerFactoryTest.java | 16 ++-- .../v1/runtime/integration/BoltConfigIT.java | 7 +- ...BoltMessagingProtocolHandlerImplTest.java} | 33 ++++--- .../integration/AuthenticationIT.java | 85 +++++++++---------- .../BoltChannelAutoReadLimiterIT.java | 5 +- .../integration/ConcurrentAccessIT.java | 5 +- .../integration/TransportErrorIT.java | 17 ++-- .../integration/TransportSessionIT.java | 55 ++++++------ .../integration/TransportTestUtil.java | 10 +++ .../socket/FragmentedMessageDeliveryTest.java | 23 ++++- .../socket/SocketTransportHandlerTest.java | 44 ++++++++-- .../BoltMessagingProtocolV2HandlerTest.java | 45 ---------- 19 files changed, 224 insertions(+), 229 deletions(-) rename community/bolt/src/main/java/org/neo4j/bolt/v1/transport/{BoltMessagingProtocolV1Handler.java => BoltMessagingProtocolHandlerImpl.java} (90%) delete mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2Handler.java rename community/bolt/src/test/java/org/neo4j/bolt/v1/transport/{BoltMessagingProtocolV1HandlerTest.java => BoltMessagingProtocolHandlerImplTest.java} (81%) delete mode 100644 community/bolt/src/test/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2HandlerTest.java diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java index d1b74f02fefd..8afcb93f0e94 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java @@ -20,11 +20,13 @@ package org.neo4j.bolt.transport; import org.neo4j.bolt.BoltChannel; +import org.neo4j.bolt.v1.messaging.Neo4jPack; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltChannelAutoReadLimiter; import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.WorkerFactory; -import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; -import org.neo4j.bolt.v2.transport.BoltMessagingProtocolV2Handler; +import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl; +import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.logging.Log; @@ -45,13 +47,13 @@ public DefaultBoltProtocolHandlerFactory( WorkerFactory workerFactory, Transport @Override public BoltMessagingProtocolHandler create( long protocolVersion, BoltChannel channel ) { - if ( protocolVersion == BoltMessagingProtocolV1Handler.VERSION_NUMBER ) + if ( protocolVersion == Neo4jPackV1.VERSION ) { - return new BoltMessagingProtocolV1Handler( channel, newBoltWorker( channel ), throttleGroup, logService ); + return newMessagingProtocolHandler( channel, new Neo4jPackV1() ); } - else if ( protocolVersion == BoltMessagingProtocolV2Handler.VERSION ) + else if ( protocolVersion == Neo4jPackV2.VERSION ) { - return new BoltMessagingProtocolV2Handler( channel, newBoltWorker( channel ), throttleGroup, logService ); + return newMessagingProtocolHandler( channel, new Neo4jPackV2() ); } else { @@ -59,6 +61,11 @@ else if ( protocolVersion == BoltMessagingProtocolV2Handler.VERSION ) } } + private BoltMessagingProtocolHandler newMessagingProtocolHandler( BoltChannel channel, Neo4jPack neo4jPack ) + { + return new BoltMessagingProtocolHandlerImpl( channel, newBoltWorker( channel ), neo4jPack, throttleGroup, logService ); + } + private BoltWorker newBoltWorker( BoltChannel channel ) { Log log = logService.getInternalLog( BoltChannelAutoReadLimiter.class ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java index 367db9645420..ccea2ceb8b43 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java @@ -70,4 +70,6 @@ interface Unpacker Packer newPacker( PackOutput output ); Unpacker newUnpacker( PackInput input ); + + int version(); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java index f5d31c046942..322c452a3e84 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java @@ -53,6 +53,8 @@ */ public class Neo4jPackV1 implements Neo4jPack { + public static final int VERSION = 1; + public static final byte NODE = 'N'; public static final byte RELATIONSHIP = 'R'; public static final byte UNBOUND_RELATIONSHIP = 'r'; @@ -70,6 +72,18 @@ public Neo4jPack.Unpacker newUnpacker( PackInput input ) return new UnpackerV1( input ); } + @Override + public int version() + { + return VERSION; + } + + @Override + public String toString() + { + return getClass().getSimpleName(); + } + protected static class PackerV1 extends PackStream.Packer implements AnyValueWriter, Neo4jPack.Packer { private Error error; diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java similarity index 90% rename from community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java rename to community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java index 8596aeebd563..5ece821375b8 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java @@ -26,12 +26,11 @@ import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.bolt.BoltChannel; -import org.neo4j.bolt.v1.messaging.Neo4jPack; import org.neo4j.bolt.transport.BoltMessagingProtocolHandler; import org.neo4j.bolt.transport.TransportThrottleGroup; import org.neo4j.bolt.v1.messaging.BoltMessageRouter; import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter; -import org.neo4j.bolt.v1.messaging.Neo4jPackV1; +import org.neo4j.bolt.v1.messaging.Neo4jPack; import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.logging.Log; @@ -42,15 +41,14 @@ *

* Versions of the framing protocol are lock-step with the messaging protocol versioning. */ -public class BoltMessagingProtocolV1Handler implements BoltMessagingProtocolHandler +public class BoltMessagingProtocolHandlerImpl implements BoltMessagingProtocolHandler { - public static final int VERSION_NUMBER = 1; - private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 8192; private final ChunkedOutput chunkedOutput; private final BoltResponseMessageWriter packer; private final BoltV1Dechunker dechunker; + private final Neo4jPack neo4jPack; private final BoltWorker worker; @@ -58,15 +56,10 @@ public class BoltMessagingProtocolV1Handler implements BoltMessagingProtocolHand private final Log internalLog; - public BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker, - TransportThrottleGroup throttleGroup, LogService logging ) - { - this( boltChannel, worker, new Neo4jPackV1(), throttleGroup, logging ); - } - - protected BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker, Neo4jPack neo4jPack, + public BoltMessagingProtocolHandlerImpl( BoltChannel boltChannel, BoltWorker worker, Neo4jPack neo4jPack, TransportThrottleGroup throttleGroup, LogService logging ) { + this.neo4jPack = neo4jPack; this.chunkedOutput = new ChunkedOutput( boltChannel.rawChannel(), DEFAULT_OUTPUT_BUFFER_SIZE, throttleGroup ); this.packer = new BoltResponseMessageWriter( neo4jPack.newPacker( chunkedOutput ), chunkedOutput, boltChannel.log() ); @@ -105,7 +98,7 @@ public void handle( ChannelHandlerContext channelContext, ByteBuf data ) @Override public int version() { - return VERSION_NUMBER; + return neo4jPack.version(); } @Override diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v2/messaging/Neo4jPackV2.java b/community/bolt/src/main/java/org/neo4j/bolt/v2/messaging/Neo4jPackV2.java index 5078969e56e5..713fc46f9f04 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v2/messaging/Neo4jPackV2.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v2/messaging/Neo4jPackV2.java @@ -36,6 +36,8 @@ public class Neo4jPackV2 extends Neo4jPackV1 { + public static final int VERSION = 2; + public static final byte POINT_2D = 'X'; public static final byte POINT_3D = 'Y'; @@ -51,6 +53,12 @@ public Neo4jPack.Unpacker newUnpacker( PackInput input ) return new UnpackerV2( input ); } + @Override + public int version() + { + return VERSION; + } + private static class PackerV2 extends Neo4jPackV1.PackerV1 { PackerV2( PackOutput output ) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2Handler.java b/community/bolt/src/main/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2Handler.java deleted file mode 100644 index ab297a03278a..000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2Handler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v2.transport; - -import org.neo4j.bolt.BoltChannel; -import org.neo4j.bolt.transport.TransportThrottleGroup; -import org.neo4j.bolt.v1.runtime.BoltWorker; -import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; -import org.neo4j.bolt.v2.messaging.Neo4jPackV2; -import org.neo4j.kernel.impl.logging.LogService; - -public class BoltMessagingProtocolV2Handler extends BoltMessagingProtocolV1Handler -{ - public static final int VERSION = 2; - - public BoltMessagingProtocolV2Handler( BoltChannel boltChannel, BoltWorker worker, - TransportThrottleGroup throttleGroup, LogService logging ) - { - super( boltChannel, worker, new Neo4jPackV2(), throttleGroup, logging ); - } - - @Override - public int version() - { - return VERSION; - } -} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/AbstractBoltTransportsTest.java b/community/bolt/src/test/java/org/neo4j/bolt/AbstractBoltTransportsTest.java index e93235794180..b6eddca029b0 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/AbstractBoltTransportsTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/AbstractBoltTransportsTest.java @@ -114,6 +114,6 @@ protected void reconnect() throws Exception private static String newName( Class connectionClass, Neo4jPack neo4jPack ) { - return connectionClass.getSimpleName() + " & " + neo4jPack.getClass().getSimpleName(); + return connectionClass.getSimpleName() + " & " + neo4jPack; } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java index d8f68d019b3c..b02a04ba9ec9 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java @@ -25,16 +25,14 @@ import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.logging.NullBoltMessageLogger; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.WorkerFactory; -import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; -import org.neo4j.bolt.v2.transport.BoltMessagingProtocolV2Handler; +import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.kernel.impl.logging.NullLogService; -import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.RETURNS_MOCKS; @@ -48,13 +46,13 @@ public class DefaultBoltProtocolHandlerFactoryTest @Test public void shouldCreateV1Handler() { - testHandlerCreation( BoltMessagingProtocolV1Handler.VERSION, BoltMessagingProtocolV1Handler.class ); + testHandlerCreation( Neo4jPackV1.VERSION ); } @Test public void shouldCreateV2Handler() { - testHandlerCreation( BoltMessagingProtocolV2Handler.VERSION, BoltMessagingProtocolV2Handler.class ); + testHandlerCreation( Neo4jPackV2.VERSION ); } @Test @@ -71,8 +69,7 @@ public void shouldCreateNothingForUnknownProtocolVersion() assertNull( handler ); } - private static void testHandlerCreation( int protocolVersion, - Class expectedHandlerClass ) + private static void testHandlerCreation( int protocolVersion ) { BoltChannel boltChannel = BoltChannel.open( newChannelCtxMock(), NullBoltMessageLogger.getInstance() ); WorkerFactory workerFactory = mock( WorkerFactory.class ); @@ -85,8 +82,7 @@ private static void testHandlerCreation( int protocolVersion, BoltMessagingProtocolHandler handler = factory.create( protocolVersion, boltChannel ); - // correct handler handler is created - assertThat( handler, instanceOf( expectedHandlerClass ) ); + // handler with correct version is created assertEquals( protocolVersion, handler.version() ); // it uses the expected worker verify( workerFactory ).newWorker( same( boltChannel ), any() ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/BoltConfigIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/BoltConfigIT.java index 8864bc0a0ff4..788510aa4696 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/BoltConfigIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/integration/BoltConfigIT.java @@ -34,7 +34,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket.DEFAULT_CONNECTOR_KEY; import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyDisconnects; -import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives; import static org.neo4j.kernel.configuration.BoltConnector.EncryptionLevel.REQUIRED; public class BoltConfigIT extends AbstractBoltTransportsTest @@ -69,7 +68,7 @@ public void shouldSupportMultipleConnectors() throws Throwable private void assertConnectionRejected( HostnamePort address, TransportConnection client ) throws Exception { client.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ); + .send( util.defaultAcceptedVersions() ); assertThat( client, eventuallyDisconnects() ); } @@ -77,8 +76,8 @@ private void assertConnectionRejected( HostnamePort address, TransportConnection private void assertConnectionAccepted( HostnamePort address, TransportConnection client ) throws Exception { client.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", emptyMap() ) ) ); - assertThat( client, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( client, util.eventuallyReceivesSelectedProtocolVersion() ); } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImplTest.java similarity index 81% rename from community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java rename to community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImplTest.java index c4ef3a516900..c9cfe71f9c98 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImplTest.java @@ -30,10 +30,11 @@ import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.transport.BoltMessagingProtocolHandler; -import org.neo4j.bolt.transport.TransportThrottleGroup; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltStateMachine; import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; +import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.SimpleLogService; import org.neo4j.logging.AssertableLogProvider; @@ -49,9 +50,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.neo4j.bolt.transport.TransportThrottleGroup.NO_THROTTLE; import static org.neo4j.logging.AssertableLogProvider.inLog; -public class BoltMessagingProtocolV1HandlerTest +public class BoltMessagingProtocolHandlerImplTest { @Test public void shouldNotTalkToChannelDirectlyOnFatalError() @@ -62,9 +64,7 @@ public void shouldNotTalkToChannelDirectlyOnFatalError() when( boltChannel.rawChannel() ).thenReturn( outputChannel ); BoltStateMachine machine = mock( BoltStateMachine.class ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, - new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, - NullLogService.getInstance() ); + BoltMessagingProtocolHandlerImpl protocol = newHandler( boltChannel, new SynchronousBoltWorker( machine ) ); verify( outputChannel ).alloc(); // And given inbound data that'll explode when the protocol tries to interpret it @@ -96,9 +96,7 @@ public void closesInputAndOutput() BoltChannel boltChannel = mock( BoltChannel.class ); when( boltChannel.rawChannel() ).thenReturn( outputChannel ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, - new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, - NullLogService.getInstance() ); + BoltMessagingProtocolHandlerImpl protocol = newHandler( boltChannel, new SynchronousBoltWorker( machine ) ); protocol.close(); verify( machine ).close(); @@ -118,13 +116,12 @@ public void messageProcessingErrorIsLogged() BoltChannel boltChannel = mock( BoltChannel.class ); when( boltChannel.rawChannel() ).thenReturn( outputChannel ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, - mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE, logService ); + BoltMessagingProtocolHandlerImpl protocol = newHandler( boltChannel, mock( BoltWorker.class ), logService ); protocol.handle( mock( ChannelHandlerContext.class ), data ); assertableLogProvider.assertExactly( - inLog( BoltMessagingProtocolV1Handler.class ).error( + inLog( BoltMessagingProtocolHandlerImpl.class ).error( equalTo( "Failed to handle incoming Bolt message. Connection will be closed." ), equalTo( error ) ) ); } @@ -132,9 +129,7 @@ public void messageProcessingErrorIsLogged() @Test public void shouldHaveCorrectVersion() { - BoltMessagingProtocolHandler handler = new BoltMessagingProtocolV1Handler( - mock( BoltChannel.class, RETURNS_MOCKS ), mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE, - NullLogService.getInstance() ); + BoltMessagingProtocolHandler handler = newHandler( mock( BoltChannel.class, RETURNS_MOCKS ), mock( BoltWorker.class ) ); assertEquals( 1, handler.version() ); } @@ -157,4 +152,14 @@ private static Channel newChannelMock() when( channel.alloc() ).thenReturn( allocator ); return channel; } + + private static BoltMessagingProtocolHandlerImpl newHandler( BoltChannel channel, BoltWorker worker ) + { + return newHandler( channel, worker, NullLogService.getInstance() ); + } + + private static BoltMessagingProtocolHandlerImpl newHandler( BoltChannel channel, BoltWorker worker, LogService logService ) + { + return new BoltMessagingProtocolHandlerImpl( channel, worker, new Neo4jPackV1(), NO_THROTTLE, logService ); + } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/AuthenticationIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/AuthenticationIT.java index 985dc6e2719f..a669d6dc9166 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/AuthenticationIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/AuthenticationIT.java @@ -59,7 +59,6 @@ import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgIgnored; import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess; import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyDisconnects; -import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives; import static org.neo4j.helpers.collection.MapUtil.map; public class AuthenticationIT extends AbstractBoltTransportsTest @@ -95,13 +94,13 @@ public void shouldRespondWithCredentialsExpiredOnFirstUse() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess( map( "credentials_expired", true, "server", version ) ) ) ); verifyConnectionOpen(); @@ -118,13 +117,13 @@ public void shouldFailIfWrongCredentials() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "wrong", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "The client is unauthorized due to authentication failure." ) ) ); @@ -136,36 +135,36 @@ public void shouldFailIfWrongCredentialsFollowingSuccessfulLogin() throws Throwa { // When change password connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "new_credentials", "secret", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); // When login again with the new password reconnect(); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "secret", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); // When login again with the wrong password reconnect(); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "wrong", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "The client is unauthorized due to authentication failure." ) ) ); @@ -177,14 +176,14 @@ public void shouldFailIfMalformedAuthTokenWrongType() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", singletonList( "neo4j" ), "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "Unsupported authentication token, the value associated with the key `principal` " + "must be a String but was: ArrayList" ) ) ); @@ -197,14 +196,14 @@ public void shouldFailIfMalformedAuthTokenMissingKey() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "this-should-have-been-credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "Unsupported authentication token, missing key `credentials`" ) ) ); @@ -216,13 +215,13 @@ public void shouldFailIfMalformedAuthTokenMissingScheme() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "Unsupported authentication token, missing key `scheme`" ) ) ); @@ -234,14 +233,14 @@ public void shouldFailIfMalformedAuthTokenUnknownScheme() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "unknown" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "Unsupported authentication token, scheme 'unknown' is not supported." ) ) ); @@ -260,13 +259,13 @@ public void shouldFailDifferentlyIfTooManyFailedAuthAttempts() throws Exception { // Done in a loop because we're racing with the clock to get enough failed requests in 5 seconds connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "WHAT_WAS_THE_PASSWORD_AGAIN", "scheme", "basic" ) ) ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( failureMatcher ) ); assertThat( connection, eventuallyDisconnects() ); @@ -284,34 +283,34 @@ public void shouldBeAbleToUpdateCredentials() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "new_credentials", "secret", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); // If I reconnect I cannot use the old password reconnect(); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "The client is unauthorized due to authentication failure." ) ) ); // But the new password works fine reconnect(); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "secret", "scheme", "basic" ) ) ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); } @@ -320,13 +319,13 @@ public void shouldBeAuthenticatedAfterUpdatingCredentials() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "new_credentials", "secret", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); // When @@ -343,13 +342,13 @@ public void shouldBeAbleToChangePasswordUsingBuiltInProcedure() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess( map( "credentials_expired", true, "server", version ) ) ) ); // When @@ -363,22 +362,22 @@ public void shouldBeAbleToChangePasswordUsingBuiltInProcedure() throws Throwable // If I reconnect I cannot use the old password reconnect(); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgFailure( Status.Security.Unauthorized, "The client is unauthorized due to authentication failure." ) ) ); // But the new password works fine reconnect(); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "secret", "scheme", "basic" ) ) ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); } @@ -387,13 +386,13 @@ public void shouldBeAuthenticatedAfterChangePasswordUsingBuiltInProcedure() thro { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess( map( "credentials_expired", true, "server", version ) ) ) ); // When @@ -418,13 +417,13 @@ public void shouldFailWhenReusingTheSamePassword() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess( map( "credentials_expired", true, "server", version ) ) ) ); // When @@ -449,13 +448,13 @@ public void shouldFailWhenSubmittingEmptyPassword() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess( map( "credentials_expired", true, "server", version ) ) ) ); // When @@ -480,13 +479,13 @@ public void shouldNotBeAbleToReadWhenPasswordChangeRequired() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( InitMessage.init( "TestClient/1.1", map( "principal", "neo4j", "credentials", "neo4j", "scheme", "basic" ) ) ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess( map( "credentials_expired", true, "server", version ) ) ) ); // When diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltChannelAutoReadLimiterIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltChannelAutoReadLimiterIT.java index f63a441bda6b..0afe092432ba 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltChannelAutoReadLimiterIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltChannelAutoReadLimiterIT.java @@ -56,7 +56,6 @@ import static org.neo4j.bolt.v1.messaging.message.InitMessage.init; import static org.neo4j.bolt.v1.messaging.message.RunMessage.run; import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess; -import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives; import static org.neo4j.kernel.api.proc.ProcedureSignature.procedureSignature; public class BoltChannelAutoReadLimiterIT extends AbstractBoltTransportsTest @@ -102,11 +101,11 @@ public void largeNumberOfSlowRunningJobsShouldChangeAutoReadState() throws Excep String largeString = StringUtils.repeat( " ", 8 * 1024 ); connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); // when diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/ConcurrentAccessIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/ConcurrentAccessIT.java index 257793076faf..4111d7c6083d 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/ConcurrentAccessIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/ConcurrentAccessIT.java @@ -47,7 +47,6 @@ import static org.neo4j.bolt.v1.messaging.message.PullAllMessage.pullAll; import static org.neo4j.bolt.v1.messaging.message.RunMessage.run; import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess; -import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives; /** * Multiple concurrent users should be able to connect simultaneously. We test this with multiple users running @@ -112,8 +111,8 @@ public Void call() throws Exception { // Connect TransportConnection client = newConnection(); - client.connect( server.lookupDefaultConnector() ).send( util.acceptedVersions( 1, 0, 0, 0 ) ); - assertThat( client, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + client.connect( server.lookupDefaultConnector() ).send( util.defaultAcceptedVersions() ); + assertThat( client, util.eventuallyReceivesSelectedProtocolVersion() ); init( client ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportErrorIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportErrorIT.java index fb85fc676580..caebaf44957d 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportErrorIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportErrorIT.java @@ -35,7 +35,6 @@ import static org.neo4j.bolt.v1.messaging.message.RunMessage.run; import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.serialize; import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyDisconnects; -import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives; public class TransportErrorIT extends AbstractBoltTransportsTest { @@ -57,11 +56,11 @@ public void shouldHandleIncorrectFraming() throws Throwable // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( 32, truncated ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, eventuallyDisconnects() ); } @@ -81,11 +80,11 @@ public void shouldHandleMessagesWithIncorrectFields() throws Throwable // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( 32, invalidMessage ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, eventuallyDisconnects() ); } @@ -104,11 +103,11 @@ public void shouldHandleUnknownMessages() throws Throwable // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( 32, invalidMessage ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, eventuallyDisconnects() ); } @@ -128,11 +127,11 @@ public void shouldHandleUnknownMarkerBytes() throws Throwable // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( 32, invalidMessage ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, eventuallyDisconnects() ); } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportSessionIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportSessionIT.java index 8bebaf58c112..e65048fb8195 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportSessionIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportSessionIT.java @@ -28,6 +28,7 @@ import java.util.HashMap; import org.neo4j.bolt.AbstractBoltTransportsTest; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.graphdb.InputPosition; import org.neo4j.graphdb.SeverityLevel; import org.neo4j.graphdb.factory.GraphDatabaseSettings; @@ -43,6 +44,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assume.assumeThat; import static org.neo4j.bolt.v1.messaging.message.AckFailureMessage.ackFailure; import static org.neo4j.bolt.v1.messaging.message.DiscardAllMessage.discardAll; import static org.neo4j.bolt.v1.messaging.message.InitMessage.init; @@ -78,10 +80,10 @@ public void shouldNegotiateProtocolVersion() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ); + .send( util.defaultAcceptedVersions() ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); } @Test @@ -100,14 +102,14 @@ public void shouldRunSimpleStatement() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "UNWIND [1,2,3] AS a RETURN a, a * a AS a_squared" ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess( CoreMatchers.allOf( hasEntry( is( "fields" ), @@ -124,14 +126,14 @@ public void shouldRespondWithMetadataToDiscardAll() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "UNWIND [1,2,3] AS a RETURN a, a * a AS a_squared" ), discardAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess( @@ -146,13 +148,13 @@ public void shouldBeAbleToRunQueryAfterAckFailure() throws Throwable { // Given connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "QINVALID" ), pullAll() ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgFailure( Status.Statement.SyntaxError, @@ -176,13 +178,13 @@ public void shouldRunProcedure() throws Throwable { // Given connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "CREATE (n:Test {age: 2}) RETURN n.age AS age" ), pullAll() ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess( CoreMatchers.allOf( hasEntry( is( "fields" ), equalTo( singletonList( "age" ) ) ), @@ -209,14 +211,14 @@ public void shouldHandleDeletedNodes() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "CREATE (n:Test) DELETE n RETURN n" ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess( CoreMatchers.allOf( hasEntry( is( "fields" ), equalTo( singletonList( "n" ) ) ), @@ -240,14 +242,14 @@ public void shouldHandleDeletedRelationships() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "CREATE ()-[r:T {prop: 42}]->() DELETE r RETURN r" ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess( CoreMatchers.allOf( hasEntry( is( "fields" ), equalTo( singletonList( "r" ) ) ), @@ -273,12 +275,12 @@ public void shouldNotLeakStatsToNextStatement() throws Throwable { // Given connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "CREATE (n)" ), pullAll() ) ); - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess(), @@ -303,14 +305,14 @@ public void shouldSendNotifications() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "EXPLAIN MATCH (a:THIS_IS_NOT_A_LABEL) RETURN count(*)" ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess(), @@ -326,18 +328,21 @@ public void shouldSendNotifications() throws Throwable } @Test - public void shouldFailNicelyOnPoints() throws Throwable + public void shouldFailNicelyOnPointsWhenProtocolDoesNotSupportThem() throws Throwable { + // only V1 protocol does not support points + assumeThat( neo4jPack.version(), equalTo( Neo4jPackV1.VERSION ) ); + // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "RETURN point({x:13, y:37, crs:'cartesian'}) as p" ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess( CoreMatchers.allOf( hasEntry( is( "fields" ), equalTo( singletonList( "p" ) ) ), @@ -368,14 +373,14 @@ public void shouldFailNicelyOnNullKeysInMap() throws Throwable // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "RETURN {p}", ValueUtils.asMapValue( params ) ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgFailure( Status.Request.Invalid, @@ -397,14 +402,14 @@ public void shouldFailNicelyWhenDroppingUnknownIndex() throws Throwable { // When connection.connect( address ) - .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.defaultAcceptedVersions() ) .send( util.chunk( init( "TestClient/1.1", emptyMap() ), run( "DROP INDEX on :Movie12345(id)" ), pullAll() ) ); // Then - assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); + assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() ); assertThat( connection, util.eventuallyReceives( msgSuccess(), msgFailure( Status.Schema.IndexDropFailed, diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportTestUtil.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportTestUtil.java index e4d683dad73b..5b1bcb339f97 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportTestUtil.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/TransportTestUtil.java @@ -112,6 +112,11 @@ public byte[] chunk( int chunkSize, byte[]... messages ) return arrayOutput; } + public byte[] defaultAcceptedVersions() + { + return acceptedVersions( neo4jPack.version(), 0, 0, 0 ); + } + public byte[] acceptedVersions( long option1, long option2, long option3, long option4 ) { ByteBuffer bb = ByteBuffer.allocate( 5 * Integer.BYTES ).order( BIG_ENDIAN ); @@ -180,6 +185,11 @@ public int receiveChunkHeader( TransportConnection conn ) throws IOException, In return ((raw[0] & 0xff) << 8 | (raw[1] & 0xff)) & 0xffff; } + public Matcher eventuallyReceivesSelectedProtocolVersion() + { + return eventuallyReceives( new byte[]{0, 0, 0, (byte) neo4jPack.version()} ); + } + public static Matcher eventuallyReceives( final byte[] expected ) { return new TypeSafeMatcher() diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java index 1d17bb89f6c3..6e476a345499 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java @@ -24,14 +24,18 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; +import java.util.List; import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.logging.NullBoltMessageLogger; import org.neo4j.bolt.transport.TransportThrottleGroup; import org.neo4j.bolt.v1.messaging.BoltRequestMessageWriter; +import org.neo4j.bolt.v1.messaging.Neo4jPack; import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.messaging.RecordingByteChannel; import org.neo4j.bolt.v1.messaging.message.RequestMessage; @@ -40,12 +44,15 @@ import org.neo4j.bolt.v1.runtime.BoltResponseHandler; import org.neo4j.bolt.v1.runtime.BoltStateMachine; import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; -import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; +import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl; +import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.util.HexPrinter; import org.neo4j.values.virtual.MapValue; import static io.netty.buffer.Unpooled.wrappedBuffer; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -68,6 +75,7 @@ * For each permutation, it delivers the fragments to the protocol implementation, and asserts the protocol handled * them properly. */ +@RunWith( Parameterized.class ) public class FragmentedMessageDeliveryTest { // Only test one chunk size for now, this can be parameterized to test lots of different ones @@ -79,6 +87,15 @@ public class FragmentedMessageDeliveryTest // Only test one message for now. This can be parameterized later to test lots of different ones private RequestMessage[] messages = new RequestMessage[]{RunMessage.run( "Mjölnir" )}; + @Parameter + public Neo4jPack neo4jPack; + + @Parameters( name = "{0}" ) + public static List parameters() + { + return Arrays.asList( new Neo4jPackV1(), new Neo4jPackV2() ); + } + @Test public void testFragmentedMessageDelivery() throws Throwable { @@ -124,8 +141,8 @@ private void testPermutation( byte[] unfragmented, ByteBuf[] fragments ) throws when( boltChannel.rawChannel() ).thenReturn( ch ); when( boltChannel.log() ).thenReturn( NullBoltMessageLogger.getInstance() ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, - new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, + BoltMessagingProtocolHandlerImpl protocol = new BoltMessagingProtocolHandlerImpl( boltChannel, + new SynchronousBoltWorker( machine ), neo4jPack, TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); // When data arrives split up according to the current permutation diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java index f3cca61846a6..34301dfce124 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java @@ -24,6 +24,11 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; import org.neo4j.bolt.logging.BoltMessageLogging; import org.neo4j.bolt.transport.BoltHandshakeProtocolHandler; @@ -31,9 +36,12 @@ import org.neo4j.bolt.transport.BoltProtocolHandlerFactory; import org.neo4j.bolt.transport.SocketTransportHandler; import org.neo4j.bolt.transport.TransportThrottleGroup; +import org.neo4j.bolt.v1.messaging.Neo4jPack; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltStateMachine; import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; -import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; +import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl; +import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.LogProvider; @@ -41,18 +49,29 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.logging.AssertableLogProvider.inLog; +@RunWith( Parameterized.class ) public class SocketTransportHandlerTest { private static final LogProvider LOG_PROVIDER = NullLogProvider.getInstance(); private static final BoltMessageLogging BOLT_LOGGING = BoltMessageLogging.none(); + @Parameter + public Neo4jPack neo4jPack; + + @Parameters( name = "{0}" ) + public static List parameters() + { + return Arrays.asList( new Neo4jPackV1(), new Neo4jPackV2() ); + } + @Test public void shouldCloseProtocolOnChannelInactive() throws Throwable { @@ -202,9 +221,22 @@ private static BoltHandshakeProtocolHandler newHandshakeHandler( BoltStateMachin { BoltProtocolHandlerFactory handlerFactory = ( version, channel ) -> { - assertEquals( 1, version ); - return new BoltMessagingProtocolV1Handler( channel, new SynchronousBoltWorker( machine ), - TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); + Neo4jPack neo4jPack; + if ( version == Neo4jPackV1.VERSION ) + { + neo4jPack = new Neo4jPackV1(); + } + else if ( version == Neo4jPackV2.VERSION ) + { + neo4jPack = new Neo4jPackV2(); + } + else + { + throw new IllegalArgumentException( "Unknown version: " + version ); + } + + return new BoltMessagingProtocolHandlerImpl( channel, new SynchronousBoltWorker( machine ), + neo4jPack, TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); }; return new BoltHandshakeProtocolHandler( handlerFactory, false, true ); @@ -214,7 +246,7 @@ private ByteBuf handshake() { ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(); buf.writeInt( 0x6060B017 ); - buf.writeInt( 1 ); + buf.writeInt( neo4jPack.version() ); buf.writeInt( 0 ); buf.writeInt( 0 ); buf.writeInt( 0 ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2HandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2HandlerTest.java deleted file mode 100644 index 0c069ee75fa2..000000000000 --- a/community/bolt/src/test/java/org/neo4j/bolt/v2/transport/BoltMessagingProtocolV2HandlerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v2.transport; - -import org.junit.Test; - -import org.neo4j.bolt.BoltChannel; -import org.neo4j.bolt.transport.BoltMessagingProtocolHandler; -import org.neo4j.bolt.transport.TransportThrottleGroup; -import org.neo4j.bolt.v1.runtime.BoltWorker; -import org.neo4j.kernel.impl.logging.NullLogService; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.RETURNS_MOCKS; -import static org.mockito.Mockito.mock; - -public class BoltMessagingProtocolV2HandlerTest -{ - @Test - public void shouldHaveCorrectVersion() - { - BoltMessagingProtocolHandler handler = new BoltMessagingProtocolV2Handler( - mock( BoltChannel.class, RETURNS_MOCKS ), mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE, - NullLogService.getInstance() ); - - assertEquals( 2, handler.version() ); - } -}