Skip to content

Commit

Permalink
Moved bolt version into Neo4jPack & better test parameterization
Browse files Browse the repository at this point in the history
`Neo4jPack` is currently the only versioned part of the bolt server
stack. This commit moves versioning into it and makes all ITs in the
bolt module use handshakes with parameterized versions.
  • Loading branch information
lutovich committed Feb 21, 2018
1 parent e513f5c commit 43236b9
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 229 deletions.
Expand Up @@ -20,11 +20,13 @@
package org.neo4j.bolt.transport;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.runtime.BoltChannelAutoReadLimiter;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.WorkerFactory;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler;
import org.neo4j.bolt.v2.transport.BoltMessagingProtocolV2Handler;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl;
import org.neo4j.bolt.v2.messaging.Neo4jPackV2;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

Expand All @@ -45,20 +47,25 @@ public DefaultBoltProtocolHandlerFactory( WorkerFactory workerFactory, Transport
@Override
public BoltMessagingProtocolHandler create( long protocolVersion, BoltChannel channel )
{
if ( protocolVersion == BoltMessagingProtocolV1Handler.VERSION_NUMBER )
if ( protocolVersion == Neo4jPackV1.VERSION )
{
return new BoltMessagingProtocolV1Handler( channel, newBoltWorker( channel ), throttleGroup, logService );
return newMessagingProtocolHandler( channel, new Neo4jPackV1() );
}
else if ( protocolVersion == BoltMessagingProtocolV2Handler.VERSION )
else if ( protocolVersion == Neo4jPackV2.VERSION )
{
return new BoltMessagingProtocolV2Handler( channel, newBoltWorker( channel ), throttleGroup, logService );
return newMessagingProtocolHandler( channel, new Neo4jPackV2() );
}
else
{
return null;
}
}

private BoltMessagingProtocolHandler newMessagingProtocolHandler( BoltChannel channel, Neo4jPack neo4jPack )
{
return new BoltMessagingProtocolHandlerImpl( channel, newBoltWorker( channel ), neo4jPack, throttleGroup, logService );
}

private BoltWorker newBoltWorker( BoltChannel channel )
{
Log log = logService.getInternalLog( BoltChannelAutoReadLimiter.class );
Expand Down
Expand Up @@ -70,4 +70,6 @@ interface Unpacker
Packer newPacker( PackOutput output );

Unpacker newUnpacker( PackInput input );

int version();
}
Expand Up @@ -53,6 +53,8 @@
*/
public class Neo4jPackV1 implements Neo4jPack
{
public static final int VERSION = 1;

public static final byte NODE = 'N';
public static final byte RELATIONSHIP = 'R';
public static final byte UNBOUND_RELATIONSHIP = 'r';
Expand All @@ -70,6 +72,18 @@ public Neo4jPack.Unpacker newUnpacker( PackInput input )
return new UnpackerV1( input );
}

@Override
public int version()
{
return VERSION;
}

@Override
public String toString()
{
return getClass().getSimpleName();
}

protected static class PackerV1 extends PackStream.Packer implements AnyValueWriter<IOException>, Neo4jPack.Packer
{
private Error error;
Expand Down
Expand Up @@ -26,12 +26,11 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.transport.BoltMessagingProtocolHandler;
import org.neo4j.bolt.transport.TransportThrottleGroup;
import org.neo4j.bolt.v1.messaging.BoltMessageRouter;
import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
Expand All @@ -42,31 +41,25 @@
* <p/>
* Versions of the framing protocol are lock-step with the messaging protocol versioning.
*/
public class BoltMessagingProtocolV1Handler implements BoltMessagingProtocolHandler
public class BoltMessagingProtocolHandlerImpl implements BoltMessagingProtocolHandler
{
public static final int VERSION_NUMBER = 1;

private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 8192;

private final ChunkedOutput chunkedOutput;
private final BoltResponseMessageWriter packer;
private final BoltV1Dechunker dechunker;
private final Neo4jPack neo4jPack;

private final BoltWorker worker;

private final AtomicInteger inFlight = new AtomicInteger( 0 );

private final Log internalLog;

public BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker,
TransportThrottleGroup throttleGroup, LogService logging )
{
this( boltChannel, worker, new Neo4jPackV1(), throttleGroup, logging );
}

protected BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker, Neo4jPack neo4jPack,
public BoltMessagingProtocolHandlerImpl( BoltChannel boltChannel, BoltWorker worker, Neo4jPack neo4jPack,
TransportThrottleGroup throttleGroup, LogService logging )
{
this.neo4jPack = neo4jPack;
this.chunkedOutput = new ChunkedOutput( boltChannel.rawChannel(), DEFAULT_OUTPUT_BUFFER_SIZE, throttleGroup );
this.packer = new BoltResponseMessageWriter(
neo4jPack.newPacker( chunkedOutput ), chunkedOutput, boltChannel.log() );
Expand Down Expand Up @@ -105,7 +98,7 @@ public void handle( ChannelHandlerContext channelContext, ByteBuf data )
@Override
public int version()
{
return VERSION_NUMBER;
return neo4jPack.version();
}

@Override
Expand Down
Expand Up @@ -36,6 +36,8 @@

public class Neo4jPackV2 extends Neo4jPackV1
{
public static final int VERSION = 2;

public static final byte POINT_2D = 'X';
public static final byte POINT_3D = 'Y';

Expand All @@ -51,6 +53,12 @@ public Neo4jPack.Unpacker newUnpacker( PackInput input )
return new UnpackerV2( input );
}

@Override
public int version()
{
return VERSION;
}

private static class PackerV2 extends Neo4jPackV1.PackerV1
{
PackerV2( PackOutput output )
Expand Down

This file was deleted.

Expand Up @@ -114,6 +114,6 @@ protected void reconnect() throws Exception

private static String newName( Class<? extends TransportConnection> connectionClass, Neo4jPack neo4jPack )
{
return connectionClass.getSimpleName() + " & " + neo4jPack.getClass().getSimpleName();
return connectionClass.getSimpleName() + " & " + neo4jPack;
}
}
Expand Up @@ -25,16 +25,14 @@

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.logging.NullBoltMessageLogger;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.WorkerFactory;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler;
import org.neo4j.bolt.v2.transport.BoltMessagingProtocolV2Handler;
import org.neo4j.bolt.v2.messaging.Neo4jPackV2;
import org.neo4j.kernel.impl.logging.NullLogService;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.RETURNS_MOCKS;
Expand All @@ -48,13 +46,13 @@ public class DefaultBoltProtocolHandlerFactoryTest
@Test
public void shouldCreateV1Handler()
{
testHandlerCreation( BoltMessagingProtocolV1Handler.VERSION, BoltMessagingProtocolV1Handler.class );
testHandlerCreation( Neo4jPackV1.VERSION );
}

@Test
public void shouldCreateV2Handler()
{
testHandlerCreation( BoltMessagingProtocolV2Handler.VERSION, BoltMessagingProtocolV2Handler.class );
testHandlerCreation( Neo4jPackV2.VERSION );
}

@Test
Expand All @@ -71,8 +69,7 @@ public void shouldCreateNothingForUnknownProtocolVersion()
assertNull( handler );
}

private static void testHandlerCreation( int protocolVersion,
Class<? extends BoltMessagingProtocolHandler> expectedHandlerClass )
private static void testHandlerCreation( int protocolVersion )
{
BoltChannel boltChannel = BoltChannel.open( newChannelCtxMock(), NullBoltMessageLogger.getInstance() );
WorkerFactory workerFactory = mock( WorkerFactory.class );
Expand All @@ -85,8 +82,7 @@ private static void testHandlerCreation( int protocolVersion,

BoltMessagingProtocolHandler handler = factory.create( protocolVersion, boltChannel );

// correct handler handler is created
assertThat( handler, instanceOf( expectedHandlerClass ) );
// handler with correct version is created
assertEquals( protocolVersion, handler.version() );
// it uses the expected worker
verify( workerFactory ).newWorker( same( boltChannel ), any() );
Expand Down
Expand Up @@ -34,7 +34,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket.DEFAULT_CONNECTOR_KEY;
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyDisconnects;
import static org.neo4j.bolt.v1.transport.integration.TransportTestUtil.eventuallyReceives;
import static org.neo4j.kernel.configuration.BoltConnector.EncryptionLevel.REQUIRED;

public class BoltConfigIT extends AbstractBoltTransportsTest
Expand Down Expand Up @@ -69,16 +68,16 @@ public void shouldSupportMultipleConnectors() throws Throwable
private void assertConnectionRejected( HostnamePort address, TransportConnection client ) throws Exception
{
client.connect( address )
.send( util.acceptedVersions( 1, 0, 0, 0 ) );
.send( util.defaultAcceptedVersions() );

assertThat( client, eventuallyDisconnects() );
}

private void assertConnectionAccepted( HostnamePort address, TransportConnection client ) throws Exception
{
client.connect( address )
.send( util.acceptedVersions( 1, 0, 0, 0 ) )
.send( util.defaultAcceptedVersions() )
.send( util.chunk( InitMessage.init( "TestClient/1.1", emptyMap() ) ) );
assertThat( client, eventuallyReceives( new byte[]{0, 0, 0, 1} ) );
assertThat( client, util.eventuallyReceivesSelectedProtocolVersion() );
}
}
Expand Up @@ -30,10 +30,11 @@

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.transport.BoltMessagingProtocolHandler;
import org.neo4j.bolt.transport.TransportThrottleGroup;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.logging.AssertableLogProvider;
Expand All @@ -49,9 +50,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.bolt.transport.TransportThrottleGroup.NO_THROTTLE;
import static org.neo4j.logging.AssertableLogProvider.inLog;

public class BoltMessagingProtocolV1HandlerTest
public class BoltMessagingProtocolHandlerImplTest
{
@Test
public void shouldNotTalkToChannelDirectlyOnFatalError()
Expand All @@ -62,9 +64,7 @@ public void shouldNotTalkToChannelDirectlyOnFatalError()
when( boltChannel.rawChannel() ).thenReturn( outputChannel );

BoltStateMachine machine = mock( BoltStateMachine.class );
BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel,
new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE,
NullLogService.getInstance() );
BoltMessagingProtocolHandlerImpl protocol = newHandler( boltChannel, new SynchronousBoltWorker( machine ) );
verify( outputChannel ).alloc();

// And given inbound data that'll explode when the protocol tries to interpret it
Expand Down Expand Up @@ -96,9 +96,7 @@ public void closesInputAndOutput()
BoltChannel boltChannel = mock( BoltChannel.class );
when( boltChannel.rawChannel() ).thenReturn( outputChannel );

BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel,
new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE,
NullLogService.getInstance() );
BoltMessagingProtocolHandlerImpl protocol = newHandler( boltChannel, new SynchronousBoltWorker( machine ) );
protocol.close();

verify( machine ).close();
Expand All @@ -118,23 +116,20 @@ public void messageProcessingErrorIsLogged()
BoltChannel boltChannel = mock( BoltChannel.class );
when( boltChannel.rawChannel() ).thenReturn( outputChannel );

BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel,
mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE, logService );
BoltMessagingProtocolHandlerImpl protocol = newHandler( boltChannel, mock( BoltWorker.class ), logService );

protocol.handle( mock( ChannelHandlerContext.class ), data );

assertableLogProvider.assertExactly(
inLog( BoltMessagingProtocolV1Handler.class ).error(
inLog( BoltMessagingProtocolHandlerImpl.class ).error(
equalTo( "Failed to handle incoming Bolt message. Connection will be closed." ),
equalTo( error ) ) );
}

@Test
public void shouldHaveCorrectVersion()
{
BoltMessagingProtocolHandler handler = new BoltMessagingProtocolV1Handler(
mock( BoltChannel.class, RETURNS_MOCKS ), mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE,
NullLogService.getInstance() );
BoltMessagingProtocolHandler handler = newHandler( mock( BoltChannel.class, RETURNS_MOCKS ), mock( BoltWorker.class ) );

assertEquals( 1, handler.version() );
}
Expand All @@ -157,4 +152,14 @@ private static Channel newChannelMock()
when( channel.alloc() ).thenReturn( allocator );
return channel;
}

private static BoltMessagingProtocolHandlerImpl newHandler( BoltChannel channel, BoltWorker worker )
{
return newHandler( channel, worker, NullLogService.getInstance() );
}

private static BoltMessagingProtocolHandlerImpl newHandler( BoltChannel channel, BoltWorker worker, LogService logService )
{
return new BoltMessagingProtocolHandlerImpl( channel, worker, new Neo4jPackV1(), NO_THROTTLE, logService );
}
}

0 comments on commit 43236b9

Please sign in to comment.