Skip to content

Commit

Permalink
Bolt V2 with point type support
Browse files Browse the repository at this point in the history
Introduced an extension of Bolt V1 with one additional type - point.
New protocol version is exactly the same as the previous one but is
able to serialize/deserialize points. Arbitrary dimensional points
are supported. Serialization format is generic and allows `double[]`
array as point coordinate.

Made protocol test utilities able to work with different protocol
versions.
  • Loading branch information
lutovich committed Feb 21, 2018
1 parent 33d9a15 commit dc11f57
Show file tree
Hide file tree
Showing 33 changed files with 934 additions and 408 deletions.
Expand Up @@ -20,12 +20,13 @@
package org.neo4j.bolt.transport;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.runtime.BoltChannelAutoReadLimiter;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.WorkerFactory;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler;
import org.neo4j.bolt.v2.transport.BoltMessagingProtocolV2Handler;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

public class DefaultBoltProtocolHandlerFactory implements BoltProtocolHandlerFactory
{
Expand All @@ -46,14 +47,22 @@ public BoltMessagingProtocolHandler create( long protocolVersion, BoltChannel ch
{
if ( protocolVersion == BoltMessagingProtocolV1Handler.VERSION_NUMBER )
{
BoltChannelAutoReadLimiter limiter =
new BoltChannelAutoReadLimiter( channel.rawChannel(), logService.getInternalLog( BoltChannelAutoReadLimiter.class ) );
BoltWorker worker = workerFactory.newWorker( channel, limiter );
return new BoltMessagingProtocolV1Handler( channel, new Neo4jPackV1(), worker, throttleGroup, logService );
return new BoltMessagingProtocolV1Handler( channel, newBoltWorker( channel ), throttleGroup, logService );
}
else if ( protocolVersion == BoltMessagingProtocolV2Handler.VERSION )
{
return new BoltMessagingProtocolV2Handler( channel, newBoltWorker( channel ), throttleGroup, logService );
}
else
{
return null;
}
}

private BoltWorker newBoltWorker( BoltChannel channel )
{
Log log = logService.getInternalLog( BoltChannelAutoReadLimiter.class );
BoltChannelAutoReadLimiter limiter = new BoltChannelAutoReadLimiter( channel.rawChannel(), log );
return workerFactory.newWorker( channel, limiter );
}
}
Expand Up @@ -61,16 +61,16 @@ public class Neo4jPackV1 implements Neo4jPack
@Override
public Neo4jPack.Packer newPacker( PackOutput output )
{
return new Packer( output );
return new PackerV1( output );
}

@Override
public Neo4jPack.Unpacker newUnpacker( PackInput input )
{
return new Unpacker( input );
return new UnpackerV1( input );
}

private static class Packer extends PackStream.Packer implements AnyValueWriter<IOException>, Neo4jPack.Packer
protected static class PackerV1 extends PackStream.Packer implements AnyValueWriter<IOException>, Neo4jPack.Packer
{
private Error error;
private static final int INITIAL_PATH_CAPACITY = 500;
Expand All @@ -80,7 +80,7 @@ private static class Packer extends PackStream.Packer implements AnyValueWriter<
private final PrimitiveLongIntKeyValueArray relationshipIndexes =
new PrimitiveLongIntKeyValueArray( INITIAL_PATH_CAPACITY );

Packer( PackOutput output )
protected PackerV1( PackOutput output )
{
super( output );
}
Expand Down Expand Up @@ -442,11 +442,11 @@ public void writeByteArray( byte[] value ) throws IOException
}
}

private static class Unpacker extends PackStream.Unpacker implements Neo4jPack.Unpacker
protected static class UnpackerV1 extends PackStream.Unpacker implements Neo4jPack.Unpacker
{
private List<Neo4jError> errors = new ArrayList<>( 2 );
private final List<Neo4jError> errors = new ArrayList<>( 2 );

Unpacker( PackInput input )
protected UnpackerV1( PackInput input )
{
super( input );
}
Expand Down Expand Up @@ -483,28 +483,7 @@ public AnyValue unpack() throws IOException
{
unpackStructHeader();
char signature = unpackStructSignature();
switch ( signature )
{
case NODE:
{
throw new BoltIOException( Status.Request.Invalid, "Nodes cannot be unpacked." );
}
case RELATIONSHIP:
{
throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." );
}
case UNBOUND_RELATIONSHIP:
{
throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." );
}
case PATH:
{
throw new BoltIOException( Status.Request.Invalid, "Paths cannot be unpacked." );
}
default:
throw new BoltIOException( Status.Request.InvalidFormat,
"Unknown struct type: " + Integer.toHexString( signature ) );
}
return unpackStruct( signature );
}
case END_OF_STREAM:
{
Expand Down Expand Up @@ -554,6 +533,32 @@ else if ( size == UNKNOWN_SIZE )
}
}

protected AnyValue unpackStruct( char signature ) throws IOException
{
switch ( signature )
{
case NODE:
{
throw new BoltIOException( Status.Request.Invalid, "Nodes cannot be unpacked." );
}
case RELATIONSHIP:
{
throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." );
}
case UNBOUND_RELATIONSHIP:
{
throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." );
}
case PATH:
{
throw new BoltIOException( Status.Request.Invalid, "Paths cannot be unpacked." );
}
default:
throw new BoltIOException( Status.Request.InvalidFormat,
"Unknown struct type: " + Integer.toHexString( signature ) );
}
}

@Override
public MapValue unpackMap() throws IOException
{
Expand Down
Expand Up @@ -26,11 +26,12 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.transport.BoltMessagingProtocolHandler;
import org.neo4j.bolt.transport.TransportThrottleGroup;
import org.neo4j.bolt.v1.messaging.BoltMessageRouter;
import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
Expand All @@ -57,7 +58,13 @@ public class BoltMessagingProtocolV1Handler implements BoltMessagingProtocolHand

private final Log internalLog;

public BoltMessagingProtocolV1Handler( BoltChannel boltChannel, Neo4jPack neo4jPack, BoltWorker worker,
public BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker,
TransportThrottleGroup throttleGroup, LogService logging )
{
this( boltChannel, worker, new Neo4jPackV1(), throttleGroup, logging );
}

protected BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker, Neo4jPack neo4jPack,
TransportThrottleGroup throttleGroup, LogService logging )
{
this.chunkedOutput = new ChunkedOutput( boltChannel.rawChannel(), DEFAULT_OUTPUT_BUFFER_SIZE, throttleGroup );
Expand Down
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v2.messaging;

import java.io.IOException;

import org.neo4j.bolt.v1.messaging.Neo4jPack;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.packstream.PackInput;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.CoordinateReferenceSystem;
import org.neo4j.values.storable.PointValue;

import static org.neo4j.values.storable.Values.doubleArray;
import static org.neo4j.values.storable.Values.pointValue;

public class Neo4jPackV2 extends Neo4jPackV1
{
public static final byte POINT = 'X';

@Override
public Neo4jPack.Packer newPacker( PackOutput output )
{
return new PackerV2( output );
}

@Override
public Neo4jPack.Unpacker newUnpacker( PackInput input )
{
return new UnpackerV2( input );
}

private static class PackerV2 extends Neo4jPackV1.PackerV1
{
PackerV2( PackOutput output )
{
super( output );
}

@Override
public void writePoint( CoordinateReferenceSystem crs, double[] coordinate ) throws IOException
{
packStructHeader( 3, POINT );
pack( crs.getTable().getTableId() );
pack( crs.getCode() );
pack( doubleArray( coordinate ) );
}
}

private static class UnpackerV2 extends Neo4jPackV1.UnpackerV1
{
UnpackerV2( PackInput input )
{
super( input );
}

@Override
protected AnyValue unpackStruct( char signature ) throws IOException
{
if ( signature == POINT )
{
return unpackPoint();
}
return super.unpackStruct( signature );
}

private PointValue unpackPoint() throws IOException
{
int tableId = unpackInteger();
int code = unpackInteger();
CoordinateReferenceSystem crs = CoordinateReferenceSystem.get( tableId, code );
int length = (int) unpackListHeader();
double[] coordinates = new double[length];
for ( int i = 0; i < length; i++ )
{
coordinates[i] = unpackDouble();
}
return pointValue( crs, coordinates );
}
}
}
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v2.transport;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.transport.TransportThrottleGroup;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler;
import org.neo4j.bolt.v2.messaging.Neo4jPackV2;
import org.neo4j.kernel.impl.logging.LogService;

public class BoltMessagingProtocolV2Handler extends BoltMessagingProtocolV1Handler
{
public static final int VERSION = 2;

public BoltMessagingProtocolV2Handler( BoltChannel boltChannel, BoltWorker worker,
TransportThrottleGroup throttleGroup, LogService logging )
{
super( boltChannel, worker, new Neo4jPackV2(), throttleGroup, logging );
}

@Override
public int version()
{
return VERSION;
}
}

0 comments on commit dc11f57

Please sign in to comment.