Skip to content

Commit

Permalink
Added magic preamble to BOLT
Browse files Browse the repository at this point in the history
Changed protocol to do the following:

```
    Client: 0x6060 B017
    Client: <option a> <option b> <option c> <option d>
    Server: <chosen option>
```
  • Loading branch information
pontusmelke committed Dec 5, 2015
1 parent ddeada0 commit 80da2e1
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 28 deletions.
8 changes: 7 additions & 1 deletion community/bolt/src/docs/dev/examples.asciidoc
Expand Up @@ -12,6 +12,7 @@ This illustrates running a simple Cypher query without parameters, and retrievin
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Expand Down Expand Up @@ -59,6 +60,7 @@ Note that these two statements are executed in two individual transactions, impl
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Expand Down Expand Up @@ -128,6 +130,7 @@ This illustrates how the server behaves when a request fails, and the server ign
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Expand Down Expand Up @@ -208,6 +211,7 @@ It will also always contain a description of the type of statement ran - `read`
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Expand Down Expand Up @@ -280,6 +284,7 @@ The resulting query plan is returned at the end of the result stream, with the p
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Expand Down Expand Up @@ -438,14 +443,15 @@ Server: SUCCESS {
----
=== Accessing notifications
When Neo4j executes a statement it may include notifications for the user.
These notifications can be warnings about problematic statements or other valuable information for a client.
These notifications can be warnings about problematic statements or other valuable information for a client.
Unlike failures or errors, notifications do not affect the execution of a statement.

.Notifications
[source,bolt_exchange]
----
# Handshake
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
Server: 00 00 00 01
Expand Down
4 changes: 3 additions & 1 deletion community/bolt/src/docs/dev/transport.asciidoc
Expand Up @@ -37,7 +37,7 @@ IMPORTANT: If TLS is enabled, and no certificate has been specified, Neo4j will
After connecting, a handshake takes place to establish which Bolt protocol version should be used for that connection.
This handshake is a _version-independent_ mini-protocol which is guaranteed to remain the same, regardless of preferred or available protocol versions.

In the handshake, the client proposes up to four protocol versions it supports, in order of preference.
In the handshake, the client fist sends a magic four byte preamble (6060 B017) followed by four protocol versions it supports, in order of preference.
The proposal is always represented as four 32-bit unsigned integers.
Each integer represents a proposed protocol version to use, or zero (`00 00 00 00`) for "none".

Expand All @@ -49,6 +49,7 @@ If none of the proposed protocols are supported, the server responds with zero (
[source,bolt_exchange]
----
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
# Version 1 None None None
Expand All @@ -61,6 +62,7 @@ Server: 00 00 00 01
[source,bolt_exchange]
----
Client: <connect>
Client: 60 60 B0 17
Client: 00 00 00 06 00 00 00 00 00 00 00 00 00 00 00 00
# Version 6 None None None
Expand Down
Expand Up @@ -124,11 +124,16 @@ private void chooseProtocolVersion( ChannelHandlerContext ctx, ByteBuf buffer )
}
return;
case NO_APPLICABLE_PROTOCOL:
buffer.release();
ctx.writeAndFlush( wrappedBuffer( new byte[]{0, 0, 0, 0} ) )
.sync()
.channel()
.close();
return;
case INVALID_HANDSHAKE:
buffer.release();
ctx.close();
return;
case PARTIAL_HANDSHAKE:
}
}
Expand All @@ -139,21 +144,24 @@ public enum HandshakeOutcome
PROTOCOL_CHOSEN,
/** Pending more bytes before handshake can complete */
PARTIAL_HANDSHAKE,
/** the client sent an invalid handshake */
INVALID_HANDSHAKE,
/** None of the clients suggested protocol versions are available :( */
NO_APPLICABLE_PROTOCOL
}

/**
* Manages the state for choosing the protocol version to use.
* The protocol opens with the client sending four suggested protocol versions, in preference order and big endian,
* each a 4-byte unsigned integer. Since that message could get split up along the way, we first gather the
* 16 bytes of data we need, and then choose a protocol to use.
* The protocol opens with the client sending four bytes (0x6060 B017) followed by four suggested protocol
* versions in preference order. All bytes are expected to be big endian, and each of the suggested protocols are
* 4-byte unsigned integers. Since that message could get split up along the way, we first gather the
* 20 bytes of data we need, and then choose a protocol to use.
*/
public static class ProtocolChooser
{
private final PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> availableVersions;
private final ByteBuffer suggestedVersions = ByteBuffer.allocateDirect( 4 * 4 ).order( ByteOrder.BIG_ENDIAN );

private final ByteBuffer handShake = ByteBuffer.allocateDirect( 5 * 4 ).order( ByteOrder.BIG_ENDIAN );
private static final int MAGIC_PREAMBLE = 0x6060B017;
private BoltProtocol protocol;

/**
Expand All @@ -166,27 +174,34 @@ public ProtocolChooser( PrimitiveLongObjectMap<Function<Channel,BoltProtocol>> a

public HandshakeOutcome handleVersionHandshakeChunk( ByteBuf buffer, Channel ch )
{
if ( suggestedVersions.remaining() > buffer.readableBytes() )
if ( handShake.remaining() > buffer.readableBytes() )
{
suggestedVersions.limit( suggestedVersions.position() + buffer.readableBytes() );
buffer.readBytes( suggestedVersions );
suggestedVersions.limit( suggestedVersions.capacity() );
handShake.limit( handShake.position() + buffer.readableBytes() );
buffer.readBytes( handShake );
handShake.limit( handShake.capacity() );
}
else
{
buffer.readBytes( suggestedVersions );
buffer.readBytes( handShake );
}

if ( suggestedVersions.remaining() == 0 )
if ( handShake.remaining() == 0 )
{
suggestedVersions.flip();
for ( int i = 0; i < 4; i++ )
handShake.flip();
//Check so that handshake starts with 0x606 0B017
if ( handShake.getInt() != MAGIC_PREAMBLE )
{
long suggestion = suggestedVersions.getInt() & 0xFFFFFFFFL;
if ( availableVersions.containsKey( suggestion ) )
return HandshakeOutcome.INVALID_HANDSHAKE;
}
else {
for ( int i = 0; i < 4; i++ )
{
protocol = availableVersions.get( suggestion ).apply( ch );
return HandshakeOutcome.PROTOCOL_CHOSEN;
long suggestion = handShake.getInt() & 0xFFFFFFFFL;
if ( availableVersions.containsKey( suggestion ) )
{
protocol = availableVersions.get( suggestion ).apply( ch );
return HandshakeOutcome.PROTOCOL_CHOSEN;
}
}
}

Expand Down
Expand Up @@ -30,6 +30,7 @@

/**
* Client: <connect>
* Client: 60 60 B0 17
* Client: 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00
* Version 1 None None None
* <p/>
Expand Down
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.transport.integration;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Collection;

import org.neo4j.bolt.v1.transport.socket.client.Connection;
import org.neo4j.bolt.v1.transport.socket.client.SecureSocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.SecureWebSocketConnection;
import org.neo4j.helpers.HostnamePort;

import static java.util.Arrays.asList;

@RunWith(Parameterized.class)
public class ConnectionIT
{
@Rule
public ExpectedException exception = ExpectedException.none();
@Rule
public Neo4jWithSocket server = new Neo4jWithSocket();

@Parameterized.Parameter(0)
public Connection connection;

@Parameterized.Parameter(1)
public HostnamePort address;

@Parameterized.Parameters
public static Collection<Object[]> transports()
{
return asList(
new Object[]{
new SecureSocketConnection(),
new HostnamePort( "localhost:7687" )
},
new Object[]{
new SecureWebSocketConnection(),
new HostnamePort( "localhost:7688" )
} );
}

@Test
public void shouldCloseConnectionOnInvalidHandshake() throws Exception
{
// GIVEN
connection.connect( address);


// WHEN
connection.send( new byte[]{(byte)0xDE, (byte) 0xAD, (byte) 0xB0, (byte) 0x17, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} );

// THEN
exception.expect( IOException.class );
connection.recv( 4 );
}
}
Expand Up @@ -99,7 +99,8 @@ public static byte[] chunk( int chunkSize, byte[] ... messages )

public static byte[] acceptedVersions( long option1, long option2, long option3, long option4 )
{
ByteBuffer bb = ByteBuffer.allocate( 4 * 4 ).order( BIG_ENDIAN );
ByteBuffer bb = ByteBuffer.allocate( 5 * 4 ).order( BIG_ENDIAN );
bb.putInt( 0x6060B017 );
bb.putInt( (int) option1 );
bb.putInt( (int) option2 );
bb.putInt( (int) option3 );
Expand Down
Expand Up @@ -36,6 +36,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.bolt.transport.SocketTransportHandler.HandshakeOutcome;
import static org.neo4j.bolt.transport.SocketTransportHandler.HandshakeOutcome.INVALID_HANDSHAKE;
import static org.neo4j.bolt.transport.SocketTransportHandler.HandshakeOutcome.NO_APPLICABLE_PROTOCOL;
import static org.neo4j.bolt.transport.SocketTransportHandler.HandshakeOutcome.PARTIAL_HANDSHAKE;
import static org.neo4j.bolt.transport.SocketTransportHandler.HandshakeOutcome.PROTOCOL_CHOSEN;
Expand All @@ -60,6 +61,7 @@ public void shouldChooseFirstAvailableProtocol() throws Throwable
// When
HandshakeOutcome outcome =
chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
(byte) 0x60, (byte) 0x60, (byte) 0xB0, (byte) 0x17,
0, 0, 0, 0,
0, 0, 0, 1,
0, 0, 0, 0,
Expand All @@ -81,16 +83,23 @@ public void shouldHandleFragmentedMessage() throws Throwable

// When
HandshakeOutcome firstOutcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
(byte) 0x60, (byte) 0x60} ), ch );
// When
HandshakeOutcome secondOutcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
(byte) 0xB0, (byte) 0x17 } ), ch );
HandshakeOutcome thirdOutcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
0, 0, 0, 0,
0, 0, 0} ), ch );
HandshakeOutcome secondOutcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
HandshakeOutcome fourthOutcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
1,
0, 0, 0, 0,
0, 0, 0, 0} ), ch );

// Then
assertThat( firstOutcome, equalTo( PARTIAL_HANDSHAKE ) );
assertThat( secondOutcome, equalTo( PROTOCOL_CHOSEN ) );
assertThat( secondOutcome, equalTo( PARTIAL_HANDSHAKE ) );
assertThat( thirdOutcome, equalTo( PARTIAL_HANDSHAKE ) );
assertThat( fourthOutcome, equalTo( PROTOCOL_CHOSEN ) );
assertThat( chooser.chosenProtocol(), equalTo( protocol ) );
}

Expand All @@ -105,6 +114,7 @@ public void shouldHandleHandshakeFollowedByMessageInSameBuffer() throws Throwabl

// When
ByteBuf buffer = wrappedBuffer( new byte[]{
(byte) 0x60, (byte) 0x60, (byte) 0xB0, (byte) 0x17,
0, 0, 0, 0,
0, 0, 0, 1,
0, 0, 0, 0,
Expand Down Expand Up @@ -132,6 +142,7 @@ public void shouldHandleVersionBoundary() throws Throwable

// When
HandshakeOutcome outcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
(byte) 0x60, (byte) 0x60, (byte) 0xB0, (byte) 0x17,
(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
0, 0, 0, 0,
0, 0, 0, 0,
Expand All @@ -154,6 +165,7 @@ public void shouldFallBackToNoneProtocolIfNoMatch() throws Throwable

// When
HandshakeOutcome outcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
(byte) 0x60, (byte) 0x60, (byte) 0xB0, (byte) 0x17,
0, 0, 0, 0,
0, 0, 0, 2,
0, 0, 0, 3,
Expand All @@ -163,4 +175,27 @@ public void shouldFallBackToNoneProtocolIfNoMatch() throws Throwable
assertThat( outcome, equalTo( NO_APPLICABLE_PROTOCOL ) );
assertThat( chooser.chosenProtocol(), nullValue() );
}

@Test
public void shouldRejectIfInvalidHandshake() throws Throwable
{
// Given
when( factory.apply( ch ) ).thenReturn( protocol );

available.put( 1, mock( Function.class ) );

ProtocolChooser chooser = new ProtocolChooser( available );

// When
HandshakeOutcome outcome = chooser.handleVersionHandshakeChunk( wrappedBuffer( new byte[]{
(byte) 0xDE, (byte) 0xAD, (byte) 0xB0, (byte) 0x17,
0, 0, 0, 1,
0, 0, 0, 0,
0, 0, 0, 0,
0, 0, 0, 0} ), ch );

// Then
assertThat( outcome, equalTo( INVALID_HANDSHAKE ) );
assertThat( chooser.chosenProtocol(), nullValue() );
}
}
Expand Up @@ -110,11 +110,12 @@ private SocketTransportHandler.ProtocolChooser protocolChooser( final Session se
private ByteBuf handshake()
{
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer();
buf.writeInt( 0x6060B017 );
buf.writeInt( 1 );
buf.writeInt( 0 );
buf.writeInt( 0 );
buf.writeInt( 0 );
return buf;
}

}
}

0 comments on commit 80da2e1

Please sign in to comment.