From 8f9e85ded34d245933120340732884a123ae22e9 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 4 Jan 2018 13:22:44 +0100 Subject: [PATCH] Extracted interface for Neo4jPack To make it easier to plug different version that, for example, supports new types, like point, geometry or date. --- .../DefaultBoltProtocolHandlerFactory.java | 3 +- .../messaging/BoltResponseMessageWriter.java | 43 -- .../neo4j/bolt/v1/messaging/Neo4jPack.java | 627 +--------------- .../neo4j/bolt/v1/messaging/Neo4jPackV1.java | 667 ++++++++++++++++++ .../BoltMessagingProtocolV1Handler.java | 6 +- .../bolt/v1/transport/BoltV1Dechunker.java | 5 +- .../v1/messaging/BoltRequestMessageTest.java | 5 +- .../v1/messaging/BoltResponseMessageTest.java | 5 +- ...eo4jPackTest.java => Neo4jPackV1Test.java} | 61 +- .../v1/messaging/util/MessageMatchers.java | 38 +- .../BoltMessagingProtocolV1HandlerTest.java | 15 +- .../v1/transport/BoltV1DechunkerTest.java | 3 +- .../socket/FragmentedMessageDeliveryTest.java | 9 +- .../socket/SocketTransportHandlerTest.java | 3 +- 14 files changed, 775 insertions(+), 715 deletions(-) create mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java rename community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/{Neo4jPackTest.java => Neo4jPackV1Test.java} (81%) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java index 408a18881e40..13787c5b85e0 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactory.java @@ -20,6 +20,7 @@ package org.neo4j.bolt.transport; import org.neo4j.bolt.BoltChannel; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.WorkerFactory; import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; @@ -45,7 +46,7 @@ public BoltMessagingProtocolHandler create( long protocolVersion, BoltChannel ch if ( protocolVersion == BoltMessagingProtocolV1Handler.VERSION ) { BoltWorker worker = workerFactory.newWorker( channel ); - return new BoltMessagingProtocolV1Handler( channel, worker, throttleGroup, logService ); + return new BoltMessagingProtocolV1Handler( channel, new Neo4jPackV1(), worker, throttleGroup, logService ); } else { diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java index 0270c7bf9c85..94cf428f6dca 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageWriter.java @@ -20,18 +20,11 @@ package org.neo4j.bolt.v1.messaging; import java.io.IOException; -import java.util.function.Supplier; import org.neo4j.bolt.logging.BoltMessageLogger; import org.neo4j.cypher.result.QueryResult; -import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.Relationship; -import org.neo4j.graphdb.spatial.Point; -import org.neo4j.kernel.impl.util.BaseToObjectValueWriter; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.values.AnyValue; -import org.neo4j.values.utils.PrettyPrinter; -import org.neo4j.values.storable.CoordinateReferenceSystem; import org.neo4j.values.virtual.MapValue; import static org.neo4j.bolt.v1.messaging.BoltResponseMessage.FAILURE; @@ -92,16 +85,6 @@ public void onSuccess( MapValue metadata ) throws IOException onMessageComplete.onMessageComplete(); } - private Supplier metadataSupplier( MapValue metadata ) - { - return () -> - { - PrettyPrinter printer = new PrettyPrinter(); - metadata.writeTo( printer ); - return printer.value(); - }; - } - @Override public void onIgnored() throws IOException { @@ -138,30 +121,4 @@ public void flush() throws IOException { packer.flush(); } - - private class MapToObjectWriter extends BaseToObjectValueWriter - { - - private UnsupportedOperationException exception = - new UnsupportedOperationException( "Functionality not implemented." ); - - @Override - protected Node newNodeProxyById( long id ) - { - throw exception; - } - - @Override - protected Relationship newRelationshipProxyById( long id ) - { - throw exception; - } - - @Override - protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate ) - { - throw exception; - } - } - } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java index 2b3e3da0b900..7d6ca170b8e9 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPack.java @@ -20,641 +20,58 @@ package org.neo4j.bolt.v1.messaging; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import org.neo4j.bolt.v1.packstream.PackInput; import org.neo4j.bolt.v1.packstream.PackOutput; -import org.neo4j.bolt.v1.packstream.PackStream; -import org.neo4j.bolt.v1.packstream.PackType; import org.neo4j.bolt.v1.runtime.Neo4jError; -import org.neo4j.collection.primitive.PrimitiveLongIntKeyValueArray; -import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.Relationship; -import org.neo4j.graphdb.spatial.Point; -import org.neo4j.kernel.api.exceptions.Status; -import org.neo4j.kernel.impl.util.BaseToObjectValueWriter; import org.neo4j.values.AnyValue; -import org.neo4j.values.AnyValueWriter; -import org.neo4j.values.storable.TextArray; -import org.neo4j.values.storable.TextValue; -import org.neo4j.values.storable.Values; -import org.neo4j.values.storable.CoordinateReferenceSystem; -import org.neo4j.values.virtual.EdgeValue; -import org.neo4j.values.virtual.ListValue; import org.neo4j.values.virtual.MapValue; -import org.neo4j.values.virtual.NodeValue; -import org.neo4j.values.virtual.VirtualValues; -import static org.neo4j.bolt.v1.packstream.PackStream.UNKNOWN_SIZE; -import static org.neo4j.values.storable.Values.byteArray; - -/** - * Extended PackStream packer and unpacker classes for working - * with Neo4j-specific data types, represented as structures. - */ -public class Neo4jPack +public interface Neo4jPack { - private static final List EMPTY_LIST = new ArrayList<>(); - private static final Map EMPTY_MAP = new HashMap<>(); - - public static final byte NODE = 'N'; - public static final byte RELATIONSHIP = 'R'; - public static final byte UNBOUND_RELATIONSHIP = 'r'; - public static final byte PATH = 'P'; - - public static class Packer extends PackStream.Packer implements AnyValueWriter + interface Packer { - private Error error; - private static final int INITIAL_PATH_CAPACITY = 500; - private static final int NO_SUCH_ID = -1; - private final PrimitiveLongIntKeyValueArray nodeIndexes = - new PrimitiveLongIntKeyValueArray( INITIAL_PATH_CAPACITY + 1 ); - private final PrimitiveLongIntKeyValueArray edgeIndexes = - new PrimitiveLongIntKeyValueArray( INITIAL_PATH_CAPACITY ); - - public Packer( PackOutput output ) - { - super( output ); - } - - public void pack( AnyValue value ) throws IOException - { - value.writeTo( this ); - } - - public void packRawMap( MapValue map ) throws IOException - { - packMapHeader( map.size() ); - for ( Map.Entry entry : map.entrySet() ) - { - pack( entry.getKey() ); - pack( entry.getValue() ); - } - } - - void consumeError() throws BoltIOException - { - if ( error != null ) - { - BoltIOException exception = new BoltIOException( error.status(), error.msg() ); - error = null; - throw exception; - } - } - - public boolean hasErrors() - { - return error != null; - } - - @Override - public void writeNodeReference( long nodeId ) throws IOException - { - throw new UnsupportedOperationException( "Cannot write a raw node reference" ); - } - - @Override - public void writeNode( long nodeId, TextArray labels, MapValue properties ) throws IOException - { - packStructHeader( 3, Neo4jPack.NODE ); - pack( nodeId ); - packListHeader( labels.length() ); - for ( int i = 0; i < labels.length(); i++ ) - { - labels.value( i ).writeTo( this ); - } - properties.writeTo( this ); - } - - @Override - public void writeEdgeReference( long edgeId ) throws IOException - { - throw new UnsupportedOperationException( "Cannot write a raw edge reference" ); - } - - @Override - public void writeEdge( long edgeId, long startNodeId, long endNodeId, TextValue type, MapValue properties ) - throws IOException - { - packStructHeader( 5, Neo4jPack.RELATIONSHIP ); - pack( edgeId ); - pack( startNodeId ); - pack( endNodeId ); - type.writeTo( this ); - properties.writeTo( this ); - } - - @Override - public void beginMap( int size ) throws IOException - { - packMapHeader( size ); - } - - @Override - public void endMap() throws IOException - { - //do nothing - } - - @Override - public void beginList( int size ) throws IOException - { - packListHeader( size ); - } - - @Override - public void endList() throws IOException - { - //do nothing - } - - @Override - public void writePath( NodeValue[] nodes, EdgeValue[] edges ) throws IOException - { - //A path is serialized in the following form - // Given path: (a {id: 42})-[r1 {id: 10}]->(b {id: 43})<-[r1 {id: 11}]-(c {id: 44}) - //The serialization will look like: - // - // { - // [a, b, c] - // [r1, r2] - // [1, 1, -2, 2] - // } - // The first list contains all nodes where the first node (a) is guaranteed to be the start node of - // the path - // The second list contains all edges of the path - // The third list defines the path order, where every other item specifies the offset into the - // relationship and node list respectively. Since all paths is guaranteed to start with a 0, meaning - // that - // a is the start node in this case, those are excluded. So the first integer in the array refers to the - // position - // in the relationship array (1 indexed where sign denotes direction) and the second one refers to - // the offset - // into the - // node list (zero indexed) and so on. - packStructHeader( 3, Neo4jPack.PATH ); - - writeNodesForPath( nodes ); - writeEdgesForPath( edges ); - - packListHeader( 2 * edges.length ); - if ( edges.length == 0 ) - { - return; - } - - NodeValue node = nodes[0]; - for ( int i = 1; i <= 2 * edges.length; i++ ) - { - if ( i % 2 == 0 ) - { - node = nodes[i / 2]; - int index = nodeIndexes.getOrDefault( node.id(), NO_SUCH_ID ); - pack( index ); - } - else - { - EdgeValue edge = edges[i / 2]; - int index = edgeIndexes.getOrDefault( edge.id(), NO_SUCH_ID ); - - if ( node.id() == edge.startNode().id() ) - { - pack( index ); - } - else - { - pack( -index ); - } - } - - } - } - - private void writeNodesForPath( NodeValue[] nodes ) throws IOException - { - nodeIndexes.reset( nodes.length ); - for ( NodeValue node : nodes ) - { - nodeIndexes.putIfAbsent( node.id(), nodeIndexes.size() ); - } - - int size = nodeIndexes.size(); - packListHeader( size ); - if ( size > 0 ) - { - NodeValue node = nodes[0]; - for ( long id : nodeIndexes.keys() ) - { - int i = 1; - while ( node.id() != id ) - { - node = nodes[i++]; - } - node.writeTo( this ); - } - } - } + void packStructHeader( int size, byte signature ) throws IOException; - private void writeEdgesForPath( EdgeValue[] edges ) throws IOException - { - edgeIndexes.reset( edges.length ); - for ( EdgeValue node : edges ) - { - // relationship indexes are one-based - edgeIndexes.putIfAbsent( node.id(), edgeIndexes.size() + 1 ); - } + void pack( AnyValue value ) throws IOException; - int size = edgeIndexes.size(); - packListHeader( size ); - if ( size > 0 ) - { - { - EdgeValue edge = edges[0]; - for ( long id : edgeIndexes.keys() ) - { - int i = 1; - while ( edge.id() != id ) - { - edge = edges[i++]; - } - //Note that we are not doing edge.writeTo(this) here since the serialization protocol - //requires these to be _unbound relationships_, thus edges without any start node nor - // end node. - packStructHeader( 3, Neo4jPack.UNBOUND_RELATIONSHIP ); - pack( edge.id() ); - edge.type().writeTo( this ); - edge.properties().writeTo( this ); - } - } - } - } + void packRawMap( MapValue map ) throws IOException; - @Override - public void writePoint( CoordinateReferenceSystem crs, double[] coordinate ) throws IOException - { - error = new Error( Status.Request.Invalid, - "Point is not yet supported as a return type in Bolt" ); - packNull(); - } + void packMapHeader( int size ) throws IOException; - @Override - public void writeNull() throws IOException - { - packNull(); - } + void flush() throws IOException; - @Override - public void writeBoolean( boolean value ) throws IOException - { - pack( value ); - } + void pack( String value ) throws IOException; - @Override - public void writeInteger( byte value ) throws IOException - { - pack( value ); - } + void packListHeader( int size ) throws IOException; - @Override - public void writeInteger( short value ) throws IOException - { - pack( value ); - } - - @Override - public void writeInteger( int value ) throws IOException - { - pack( value ); - } - - @Override - public void writeInteger( long value ) throws IOException - { - pack( value ); - } - - @Override - public void writeFloatingPoint( float value ) throws IOException - { - pack( value ); - } - - @Override - public void writeFloatingPoint( double value ) throws IOException - { - pack( value ); - } - - @Override - public void writeUTF8( byte[] bytes, int offset, int length ) throws IOException - { - packUTF8(bytes, offset, length); - } - - @Override - public void writeString( String value ) throws IOException - { - pack( value ); - } - - @Override - public void writeString( char value ) throws IOException - { - pack( value ); - } - - @Override - public void beginArray( int size, ArrayType arrayType ) throws IOException - { - switch ( arrayType ) - { - case BYTE: - packBytesHeader( size ); - break; - default: - packListHeader( size ); - } - - } - - @Override - public void endArray() throws IOException - { - //Do nothing - } - - @Override - public void writeByteArray( byte[] value ) throws IOException - { - pack( value ); - } + void consumeError() throws BoltIOException; } - public static class Unpacker extends PackStream.Unpacker + interface Unpacker { + boolean hasNext() throws IOException; - private List errors = new ArrayList<>( 2 ); - - public Unpacker( PackInput input ) - { - super( input ); - } + long unpackStructHeader() throws IOException; - public AnyValue unpack() throws IOException - { - PackType valType = peekNextType(); - switch ( valType ) - { - case BYTES: - return byteArray( unpackBytes() ); - case STRING: - return Values.utf8Value( unpackUTF8() ); - case INTEGER: - return Values.longValue( unpackLong() ); - case FLOAT: - return Values.doubleValue( unpackDouble() ); - case BOOLEAN: - return Values.booleanValue( unpackBoolean() ); - case NULL: - // still need to move past the null value - unpackNull(); - return Values.NO_VALUE; - case LIST: - { - return unpackList(); - } - case MAP: - { - return unpackMap(); - } - case STRUCT: - { - unpackStructHeader(); - char signature = unpackStructSignature(); - switch ( signature ) - { - case NODE: - { - throw new BoltIOException( Status.Request.Invalid, "Nodes cannot be unpacked." ); - } - case RELATIONSHIP: - { - throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." ); - } - case UNBOUND_RELATIONSHIP: - { - throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." ); - } - case PATH: - { - throw new BoltIOException( Status.Request.Invalid, "Paths cannot be unpacked." ); - } - default: - throw new BoltIOException( Status.Request.InvalidFormat, - "Unknown struct type: " + Integer.toHexString( signature ) ); - } - } - case END_OF_STREAM: - { - unpackEndOfStream(); - return null; - } - default: - throw new BoltIOException( Status.Request.InvalidFormat, - "Unknown value type: " + valType ); - } - } + char unpackStructSignature() throws IOException; - ListValue unpackList() throws IOException - { - int size = (int) unpackListHeader(); - if ( size == 0 ) - { - return VirtualValues.EMPTY_LIST; - } - ArrayList list; - if ( size == UNKNOWN_SIZE ) - { - list = new ArrayList<>(); - boolean more = true; - while ( more ) - { - PackType keyType = peekNextType(); - switch ( keyType ) - { - case END_OF_STREAM: - unpack(); - more = false; - break; - default: - list.add( unpack() ); - } - } - } - else - { - list = new ArrayList<>( size ); - for ( int i = 0; i < size; i++ ) - { - list.add( unpack() ); - } - } - return VirtualValues.list( list.toArray( new AnyValue[list.size()] ) ); - } + String unpackString() throws IOException; - public MapValue unpackMap() throws IOException - { - int size = (int) unpackMapHeader(); - if ( size == 0 ) - { - return VirtualValues.EMPTY_MAP; - } - Map map; - if ( size == UNKNOWN_SIZE ) - { - map = new HashMap<>(); - boolean more = true; - while ( more ) - { - PackType keyType = peekNextType(); - String key; - AnyValue val; - switch ( keyType ) - { - case END_OF_STREAM: - unpack(); - more = false; - break; - case STRING: - key = unpackString(); - val = unpack(); - if ( map.put( key, val ) != null ) - { - errors.add( - Neo4jError.from( Status.Request.Invalid, "Duplicate map key `" + key + "`." ) ); - } - break; - case NULL: - errors.add( Neo4jError.from( Status.Request.Invalid, - "Value `null` is not supported as key in maps, must be a non-nullable string." ) ); - unpackNull(); - val = unpack(); - map.put( null, val ); - break; - default: - throw new PackStream.PackStreamException( "Bad key type" ); - } - } - } - else - { - map = new HashMap<>( size, 1 ); - for ( int i = 0; i < size; i++ ) - { - PackType type = peekNextType(); - String key; - switch ( type ) - { - case NULL: - errors.add( Neo4jError.from( Status.Request.Invalid, - "Value `null` is not supported as key in maps, must be a non-nullable string." ) ); - unpackNull(); - key = null; - break; - case STRING: - key = unpackString(); - break; - default: - throw new PackStream.PackStreamException( "Bad key type: " + type ); - } + Map unpackToRawMap() throws IOException; - AnyValue val = unpack(); - if ( map.put( key, val ) != null ) - { - errors.add( Neo4jError.from( Status.Request.Invalid, "Duplicate map key `" + key + "`." ) ); - } - } - } - return VirtualValues.map( map ); - } + MapValue unpackMap() throws IOException; - public Map unpackToRawMap() throws IOException - { - MapValue mapValue = unpackMap(); - HashMap map = new HashMap<>( mapValue.size() ); - for ( Map.Entry entry : mapValue.entrySet() ) - { - UnpackerWriter unpackerWriter = new UnpackerWriter(); - entry.getValue().writeTo( unpackerWriter ); - map.put( entry.getKey(), unpackerWriter.value() ); - } - return map; - } + long unpackListHeader() throws IOException; - Optional consumeError() - { - if ( errors.isEmpty() ) - { - return Optional.empty(); - } - else - { - Neo4jError combined = Neo4jError.combine( errors ); - errors.clear(); - return Optional.of( combined ); - } - } - } - - private static class Error - { - private final Status status; - private final String msg; - - private Error( Status status, String msg ) - { - this.status = status; - this.msg = msg; - } - - Status status() - { - return status; - } - - String msg() - { - return msg; - } - } + AnyValue unpack() throws IOException; - private Neo4jPack() - { + Optional consumeError(); } - private static class UnpackerWriter extends BaseToObjectValueWriter - { - - @Override - protected Node newNodeProxyById( long id ) - { - throw new UnsupportedOperationException( "Cannot unpack nodes" ); - } - - @Override - protected Relationship newRelationshipProxyById( long id ) - { - throw new UnsupportedOperationException( "Cannot unpack relationships" ); - } + Packer newPacker( PackOutput output ); - @Override - protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate ) - { - throw new UnsupportedOperationException( "Cannot unpack points" ); - } - } + Unpacker newUnpacker( PackInput input ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java new file mode 100644 index 000000000000..f0eaa9571826 --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1.java @@ -0,0 +1,667 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.v1.messaging; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.neo4j.bolt.v1.packstream.PackInput; +import org.neo4j.bolt.v1.packstream.PackOutput; +import org.neo4j.bolt.v1.packstream.PackStream; +import org.neo4j.bolt.v1.packstream.PackType; +import org.neo4j.bolt.v1.runtime.Neo4jError; +import org.neo4j.collection.primitive.PrimitiveLongIntKeyValueArray; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.spatial.Point; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.impl.util.BaseToObjectValueWriter; +import org.neo4j.values.AnyValue; +import org.neo4j.values.AnyValueWriter; +import org.neo4j.values.storable.TextArray; +import org.neo4j.values.storable.TextValue; +import org.neo4j.values.storable.Values; +import org.neo4j.values.storable.CoordinateReferenceSystem; +import org.neo4j.values.virtual.EdgeValue; +import org.neo4j.values.virtual.ListValue; +import org.neo4j.values.virtual.MapValue; +import org.neo4j.values.virtual.NodeValue; +import org.neo4j.values.virtual.VirtualValues; + +import static org.neo4j.bolt.v1.packstream.PackStream.UNKNOWN_SIZE; +import static org.neo4j.values.storable.Values.byteArray; + +/** + * Extended PackStream packer and unpacker classes for working + * with Neo4j-specific data types, represented as structures. + */ +public class Neo4jPackV1 implements Neo4jPack +{ + public static final byte NODE = 'N'; + public static final byte RELATIONSHIP = 'R'; + public static final byte UNBOUND_RELATIONSHIP = 'r'; + public static final byte PATH = 'P'; + + @Override + public Neo4jPack.Packer newPacker( PackOutput output ) + { + return new Packer( output ); + } + + @Override + public Neo4jPack.Unpacker newUnpacker( PackInput input ) + { + return new Unpacker( input ); + } + + private static class Packer extends PackStream.Packer implements AnyValueWriter, Neo4jPack.Packer + { + private Error error; + private static final int INITIAL_PATH_CAPACITY = 500; + private static final int NO_SUCH_ID = -1; + private final PrimitiveLongIntKeyValueArray nodeIndexes = + new PrimitiveLongIntKeyValueArray( INITIAL_PATH_CAPACITY + 1 ); + private final PrimitiveLongIntKeyValueArray edgeIndexes = + new PrimitiveLongIntKeyValueArray( INITIAL_PATH_CAPACITY ); + + Packer( PackOutput output ) + { + super( output ); + } + + @Override + public void pack( AnyValue value ) throws IOException + { + value.writeTo( this ); + } + + @Override + public void packRawMap( MapValue map ) throws IOException + { + packMapHeader( map.size() ); + for ( Map.Entry entry : map.entrySet() ) + { + pack( entry.getKey() ); + pack( entry.getValue() ); + } + } + + @Override + public void consumeError() throws BoltIOException + { + if ( error != null ) + { + BoltIOException exception = new BoltIOException( error.status(), error.msg() ); + error = null; + throw exception; + } + } + + @Override + public void writeNodeReference( long nodeId ) throws IOException + { + throw new UnsupportedOperationException( "Cannot write a raw node reference" ); + } + + @Override + public void writeNode( long nodeId, TextArray labels, MapValue properties ) throws IOException + { + packStructHeader( 3, NODE ); + pack( nodeId ); + packListHeader( labels.length() ); + for ( int i = 0; i < labels.length(); i++ ) + { + labels.value( i ).writeTo( this ); + } + properties.writeTo( this ); + } + + @Override + public void writeEdgeReference( long edgeId ) throws IOException + { + throw new UnsupportedOperationException( "Cannot write a raw edge reference" ); + } + + @Override + public void writeEdge( long edgeId, long startNodeId, long endNodeId, TextValue type, MapValue properties ) + throws IOException + { + packStructHeader( 5, RELATIONSHIP ); + pack( edgeId ); + pack( startNodeId ); + pack( endNodeId ); + type.writeTo( this ); + properties.writeTo( this ); + } + + @Override + public void beginMap( int size ) throws IOException + { + packMapHeader( size ); + } + + @Override + public void endMap() throws IOException + { + //do nothing + } + + @Override + public void beginList( int size ) throws IOException + { + packListHeader( size ); + } + + @Override + public void endList() throws IOException + { + //do nothing + } + + @Override + public void writePath( NodeValue[] nodes, EdgeValue[] edges ) throws IOException + { + //A path is serialized in the following form + // Given path: (a {id: 42})-[r1 {id: 10}]->(b {id: 43})<-[r1 {id: 11}]-(c {id: 44}) + //The serialization will look like: + // + // { + // [a, b, c] + // [r1, r2] + // [1, 1, -2, 2] + // } + // The first list contains all nodes where the first node (a) is guaranteed to be the start node of + // the path + // The second list contains all edges of the path + // The third list defines the path order, where every other item specifies the offset into the + // relationship and node list respectively. Since all paths is guaranteed to start with a 0, meaning + // that + // a is the start node in this case, those are excluded. So the first integer in the array refers to the + // position + // in the relationship array (1 indexed where sign denotes direction) and the second one refers to + // the offset + // into the + // node list (zero indexed) and so on. + packStructHeader( 3, PATH ); + + writeNodesForPath( nodes ); + writeEdgesForPath( edges ); + + packListHeader( 2 * edges.length ); + if ( edges.length == 0 ) + { + return; + } + + NodeValue node = nodes[0]; + for ( int i = 1; i <= 2 * edges.length; i++ ) + { + if ( i % 2 == 0 ) + { + node = nodes[i / 2]; + int index = nodeIndexes.getOrDefault( node.id(), NO_SUCH_ID ); + pack( index ); + } + else + { + EdgeValue edge = edges[i / 2]; + int index = edgeIndexes.getOrDefault( edge.id(), NO_SUCH_ID ); + + if ( node.id() == edge.startNode().id() ) + { + pack( index ); + } + else + { + pack( -index ); + } + } + + } + } + + private void writeNodesForPath( NodeValue[] nodes ) throws IOException + { + nodeIndexes.reset( nodes.length ); + for ( NodeValue node : nodes ) + { + nodeIndexes.putIfAbsent( node.id(), nodeIndexes.size() ); + } + + int size = nodeIndexes.size(); + packListHeader( size ); + if ( size > 0 ) + { + NodeValue node = nodes[0]; + for ( long id : nodeIndexes.keys() ) + { + int i = 1; + while ( node.id() != id ) + { + node = nodes[i++]; + } + node.writeTo( this ); + } + } + } + + private void writeEdgesForPath( EdgeValue[] edges ) throws IOException + { + edgeIndexes.reset( edges.length ); + for ( EdgeValue node : edges ) + { + // relationship indexes are one-based + edgeIndexes.putIfAbsent( node.id(), edgeIndexes.size() + 1 ); + } + + int size = edgeIndexes.size(); + packListHeader( size ); + if ( size > 0 ) + { + { + EdgeValue edge = edges[0]; + for ( long id : edgeIndexes.keys() ) + { + int i = 1; + while ( edge.id() != id ) + { + edge = edges[i++]; + } + //Note that we are not doing edge.writeTo(this) here since the serialization protocol + //requires these to be _unbound relationships_, thus edges without any start node nor + // end node. + packStructHeader( 3, UNBOUND_RELATIONSHIP ); + pack( edge.id() ); + edge.type().writeTo( this ); + edge.properties().writeTo( this ); + } + } + } + } + + @Override + public void writePoint( CoordinateReferenceSystem crs, double[] coordinate ) throws IOException + { + error = new Error( Status.Request.Invalid, + "Point is not yet supported as a return type in Bolt" ); + packNull(); + } + + @Override + public void writeNull() throws IOException + { + packNull(); + } + + @Override + public void writeBoolean( boolean value ) throws IOException + { + pack( value ); + } + + @Override + public void writeInteger( byte value ) throws IOException + { + pack( value ); + } + + @Override + public void writeInteger( short value ) throws IOException + { + pack( value ); + } + + @Override + public void writeInteger( int value ) throws IOException + { + pack( value ); + } + + @Override + public void writeInteger( long value ) throws IOException + { + pack( value ); + } + + @Override + public void writeFloatingPoint( float value ) throws IOException + { + pack( value ); + } + + @Override + public void writeFloatingPoint( double value ) throws IOException + { + pack( value ); + } + + @Override + public void writeUTF8( byte[] bytes, int offset, int length ) throws IOException + { + packUTF8(bytes, offset, length); + } + + @Override + public void writeString( String value ) throws IOException + { + pack( value ); + } + + @Override + public void writeString( char value ) throws IOException + { + pack( value ); + } + + @Override + public void beginArray( int size, ArrayType arrayType ) throws IOException + { + switch ( arrayType ) + { + case BYTE: + packBytesHeader( size ); + break; + default: + packListHeader( size ); + } + + } + + @Override + public void endArray() throws IOException + { + //Do nothing + } + + @Override + public void writeByteArray( byte[] value ) throws IOException + { + pack( value ); + } + } + + private static class Unpacker extends PackStream.Unpacker implements Neo4jPack.Unpacker + { + + private List errors = new ArrayList<>( 2 ); + + Unpacker( PackInput input ) + { + super( input ); + } + + @Override + public AnyValue unpack() throws IOException + { + PackType valType = peekNextType(); + switch ( valType ) + { + case BYTES: + return byteArray( unpackBytes() ); + case STRING: + return Values.utf8Value( unpackUTF8() ); + case INTEGER: + return Values.longValue( unpackLong() ); + case FLOAT: + return Values.doubleValue( unpackDouble() ); + case BOOLEAN: + return Values.booleanValue( unpackBoolean() ); + case NULL: + // still need to move past the null value + unpackNull(); + return Values.NO_VALUE; + case LIST: + { + return unpackList(); + } + case MAP: + { + return unpackMap(); + } + case STRUCT: + { + unpackStructHeader(); + char signature = unpackStructSignature(); + switch ( signature ) + { + case NODE: + { + throw new BoltIOException( Status.Request.Invalid, "Nodes cannot be unpacked." ); + } + case RELATIONSHIP: + { + throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." ); + } + case UNBOUND_RELATIONSHIP: + { + throw new BoltIOException( Status.Request.Invalid, "Relationships cannot be unpacked." ); + } + case PATH: + { + throw new BoltIOException( Status.Request.Invalid, "Paths cannot be unpacked." ); + } + default: + throw new BoltIOException( Status.Request.InvalidFormat, + "Unknown struct type: " + Integer.toHexString( signature ) ); + } + } + case END_OF_STREAM: + { + unpackEndOfStream(); + return null; + } + default: + throw new BoltIOException( Status.Request.InvalidFormat, + "Unknown value type: " + valType ); + } + } + + ListValue unpackList() throws IOException + { + int size = (int) unpackListHeader(); + if ( size == 0 ) + { + return VirtualValues.EMPTY_LIST; + } + ArrayList list; + if ( size == UNKNOWN_SIZE ) + { + list = new ArrayList<>(); + boolean more = true; + while ( more ) + { + PackType keyType = peekNextType(); + switch ( keyType ) + { + case END_OF_STREAM: + unpack(); + more = false; + break; + default: + list.add( unpack() ); + } + } + } + else + { + list = new ArrayList<>( size ); + for ( int i = 0; i < size; i++ ) + { + list.add( unpack() ); + } + } + return VirtualValues.list( list.toArray( new AnyValue[list.size()] ) ); + } + + @Override + public MapValue unpackMap() throws IOException + { + int size = (int) unpackMapHeader(); + if ( size == 0 ) + { + return VirtualValues.EMPTY_MAP; + } + Map map; + if ( size == UNKNOWN_SIZE ) + { + map = new HashMap<>(); + boolean more = true; + while ( more ) + { + PackType keyType = peekNextType(); + String key; + AnyValue val; + switch ( keyType ) + { + case END_OF_STREAM: + unpack(); + more = false; + break; + case STRING: + key = unpackString(); + val = unpack(); + if ( map.put( key, val ) != null ) + { + errors.add( + Neo4jError.from( Status.Request.Invalid, "Duplicate map key `" + key + "`." ) ); + } + break; + case NULL: + errors.add( Neo4jError.from( Status.Request.Invalid, + "Value `null` is not supported as key in maps, must be a non-nullable string." ) ); + unpackNull(); + val = unpack(); + map.put( null, val ); + break; + default: + throw new PackStream.PackStreamException( "Bad key type" ); + } + } + } + else + { + map = new HashMap<>( size, 1 ); + for ( int i = 0; i < size; i++ ) + { + PackType type = peekNextType(); + String key; + switch ( type ) + { + case NULL: + errors.add( Neo4jError.from( Status.Request.Invalid, + "Value `null` is not supported as key in maps, must be a non-nullable string." ) ); + unpackNull(); + key = null; + break; + case STRING: + key = unpackString(); + break; + default: + throw new PackStream.PackStreamException( "Bad key type: " + type ); + } + + AnyValue val = unpack(); + if ( map.put( key, val ) != null ) + { + errors.add( Neo4jError.from( Status.Request.Invalid, "Duplicate map key `" + key + "`." ) ); + } + } + } + return VirtualValues.map( map ); + } + + @Override + public Map unpackToRawMap() throws IOException + { + MapValue mapValue = unpackMap(); + HashMap map = new HashMap<>( mapValue.size() ); + for ( Map.Entry entry : mapValue.entrySet() ) + { + UnpackerWriter unpackerWriter = new UnpackerWriter(); + entry.getValue().writeTo( unpackerWriter ); + map.put( entry.getKey(), unpackerWriter.value() ); + } + return map; + } + + @Override + public Optional consumeError() + { + if ( errors.isEmpty() ) + { + return Optional.empty(); + } + else + { + Neo4jError combined = Neo4jError.combine( errors ); + errors.clear(); + return Optional.of( combined ); + } + } + } + + private static class Error + { + private final Status status; + private final String msg; + + private Error( Status status, String msg ) + { + this.status = status; + this.msg = msg; + } + + Status status() + { + return status; + } + + String msg() + { + return msg; + } + } + + private static class UnpackerWriter extends BaseToObjectValueWriter + { + + @Override + protected Node newNodeProxyById( long id ) + { + throw new UnsupportedOperationException( "Cannot unpack nodes" ); + } + + @Override + protected Relationship newRelationshipProxyById( long id ) + { + throw new UnsupportedOperationException( "Cannot unpack relationships" ); + } + + @Override + protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate ) + { + throw new UnsupportedOperationException( "Cannot unpack points" ); + } + } +} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java index 2446a6e20dd2..6fa104bcfb3f 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1Handler.java @@ -57,14 +57,16 @@ public class BoltMessagingProtocolV1Handler implements BoltMessagingProtocolHand private final Log internalLog; - public BoltMessagingProtocolV1Handler( BoltChannel boltChannel, BoltWorker worker, TransportThrottleGroup throttleGroup, LogService logging ) + public BoltMessagingProtocolV1Handler( BoltChannel boltChannel, Neo4jPack neo4jPack, BoltWorker worker, + TransportThrottleGroup throttleGroup, LogService logging ) { this.chunkedOutput = new ChunkedOutput( boltChannel.rawChannel(), DEFAULT_OUTPUT_BUFFER_SIZE, throttleGroup ); this.packer = new BoltResponseMessageWriter( - new Neo4jPack.Packer( chunkedOutput ), chunkedOutput, boltChannel.log() ); + neo4jPack.newPacker( chunkedOutput ), chunkedOutput, boltChannel.log() ); this.worker = worker; this.internalLog = logging.getInternalLog( getClass() ); this.dechunker = new BoltV1Dechunker( + neo4jPack, new BoltMessageRouter( internalLog, boltChannel.log(), worker, packer, this::onMessageDone ), this::onMessageStarted ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltV1Dechunker.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltV1Dechunker.java index 3218f2522513..3e669a9bd6c8 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltV1Dechunker.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltV1Dechunker.java @@ -45,12 +45,13 @@ public enum State private State state = State.AWAITING_CHUNK; private int chunkSize; - public BoltV1Dechunker( BoltRequestMessageHandler messageHandler, Runnable onMessageStarted ) + public BoltV1Dechunker( Neo4jPack neo4jPack, BoltRequestMessageHandler messageHandler, + Runnable onMessageStarted ) { this.onMessage = messageHandler; this.onMessageStarted = onMessageStarted; this.input = new ChunkedInput(); - this.unpacker = new BoltRequestMessageReader( new Neo4jPack.Unpacker( input ) ); + this.unpacker = new BoltRequestMessageReader( neo4jPack.newUnpacker( input ) ); } /** Check if we are currently "in the middle of" a message, eg. we've gotten parts of it, but are waiting for more. */ diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltRequestMessageTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltRequestMessageTest.java index 6a1e29914029..620beece342f 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltRequestMessageTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltRequestMessageTest.java @@ -129,10 +129,11 @@ private void assertSerializes( RequestMessage msg ) throws IOException private T serializeAndDeserialize( T msg ) throws IOException { RecordingByteChannel channel = new RecordingByteChannel(); + Neo4jPack neo4jPack = new Neo4jPackV1(); BoltRequestMessageReader reader = new BoltRequestMessageReader( - new Neo4jPack.Unpacker( new BufferedChannelInput( 16 ).reset( channel ) ) ); + neo4jPack.newUnpacker( new BufferedChannelInput( 16 ).reset( channel ) ) ); BoltRequestMessageWriter writer = new BoltRequestMessageWriter( - new Neo4jPack.Packer( new BufferedChannelOutput( channel ) ), NO_BOUNDARY_HOOK ); + neo4jPack.newPacker( new BufferedChannelOutput( channel ) ), NO_BOUNDARY_HOOK ); writer.write( msg ).flush(); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageTest.java index 3f81367d390b..677400b767ff 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/BoltResponseMessageTest.java @@ -226,10 +226,11 @@ private void assertSerializes( ResponseMessage msg ) throws IOException private T serializeAndDeserialize( T msg ) throws IOException { RecordingByteChannel channel = new RecordingByteChannel(); + Neo4jPack neo4jPack = new Neo4jPackV1(); BoltResponseMessageReader reader = new BoltResponseMessageReader( - new Neo4jPack.Unpacker( new BufferedChannelInput( 16 ).reset( channel ) ) ); + neo4jPack.newUnpacker( new BufferedChannelInput( 16 ).reset( channel ) ) ); BoltResponseMessageWriter writer = new BoltResponseMessageWriter( - new Neo4jPack.Packer( new BufferedChannelOutput( channel ) ), NO_BOUNDARY_HOOK, + neo4jPack.newPacker( new BufferedChannelOutput( channel ) ), NO_BOUNDARY_HOOK, NullBoltMessageLogger.getInstance() ); msg.dispatch( writer ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/Neo4jPackTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1Test.java similarity index 81% rename from community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/Neo4jPackTest.java rename to community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1Test.java index 0c18b6a47147..771bdb869d00 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/Neo4jPackTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/Neo4jPackV1Test.java @@ -24,6 +24,7 @@ import org.junit.rules.ExpectedException; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; @@ -39,7 +40,6 @@ import org.neo4j.values.storable.TextArray; import org.neo4j.values.storable.TextValue; import org.neo4j.values.storable.UTF8StringValue; -import org.neo4j.values.storable.Values; import org.neo4j.values.virtual.ListValue; import org.neo4j.values.virtual.MapValue; import org.neo4j.values.virtual.PathValue; @@ -52,19 +52,24 @@ import static org.neo4j.bolt.v1.messaging.example.Edges.ALICE_KNOWS_BOB; import static org.neo4j.bolt.v1.messaging.example.Nodes.ALICE; import static org.neo4j.bolt.v1.messaging.example.Paths.ALL_PATHS; +import static org.neo4j.values.storable.Values.charArray; import static org.neo4j.values.storable.Values.charValue; +import static org.neo4j.values.storable.Values.intValue; import static org.neo4j.values.storable.Values.longValue; import static org.neo4j.values.storable.Values.stringValue; +import static org.neo4j.values.storable.Values.utf8Value; -public class Neo4jPackTest +public class Neo4jPackV1Test { + private final Neo4jPackV1 neo4jPack = new Neo4jPackV1(); + @Rule public ExpectedException exception = ExpectedException.none(); private byte[] packed( AnyValue object ) throws IOException { PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); packer.pack( object ); return output.bytes(); } @@ -72,18 +77,17 @@ private byte[] packed( AnyValue object ) throws IOException private AnyValue unpacked( byte[] bytes ) throws IOException { PackedInputArray input = new PackedInputArray( bytes ); - Neo4jPack.Unpacker unpacker = new Neo4jPack.Unpacker( input ); + Neo4jPack.Unpacker unpacker = neo4jPack.newUnpacker( input ); return unpacker.unpack(); } - @SuppressWarnings( "unchecked" ) @Test - public void shouldBeAbleToPackAndUnpackListStream() throws IOException + public void shouldBeAbleToPackAndUnpackList() throws IOException { // Given PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); - packer.packListStreamHeader(); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); + packer.packListHeader( ALICE.labels().length() ); List expected = new ArrayList<>(); TextArray labels = ALICE.labels(); for ( int i = 0; i < labels.length(); i++ ) @@ -92,7 +96,6 @@ public void shouldBeAbleToPackAndUnpackListStream() throws IOException packer.pack( labelName ); expected.add( labelName ); } - packer.packEndOfStream(); AnyValue unpacked = unpacked( output.bytes() ); // Then @@ -101,14 +104,13 @@ public void shouldBeAbleToPackAndUnpackListStream() throws IOException assertThat( unpackedList, equalTo( ValueUtils.asListValue( expected ) ) ); } - @SuppressWarnings( "unchecked" ) @Test - public void shouldBeAbleToPackAndUnpackMapStream() throws IOException + public void shouldBeAbleToPackAndUnpackMap() throws IOException { // Given PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); - packer.packMapStreamHeader(); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); + packer.packMapHeader( ALICE.properties().size() ); ALICE.properties().foreach( ( s, value ) -> { try @@ -118,10 +120,9 @@ public void shouldBeAbleToPackAndUnpackMapStream() throws IOException } catch ( IOException e ) { - e.printStackTrace(); + throw new UncheckedIOException( e ); } } ); - packer.packEndOfStream(); AnyValue unpacked = unpacked( output.bytes() ); // Then @@ -130,28 +131,26 @@ public void shouldBeAbleToPackAndUnpackMapStream() throws IOException assertThat( unpackedMap, equalTo( ALICE.properties() ) ); } - @SuppressWarnings( "unchecked" ) @Test - public void shouldFailWhenTryingToPackAndUnpackMapStreamContainingNullKeys() throws IOException + public void shouldFailWhenTryingToPackAndUnpackMapContainingNullKeys() throws IOException { // Given PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); - packer.packMapStreamHeader(); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); - HashMap map = new HashMap<>(); + Map map = new HashMap<>(); map.put( null, longValue( 42L ) ); map.put( "foo", longValue( 1337L ) ); + packer.packMapHeader( map.size() ); for ( Map.Entry entry : map.entrySet() ) { packer.pack( entry.getKey() ); packer.pack( entry.getValue() ); } - packer.packEndOfStream(); // When PackedInputArray input = new PackedInputArray( output.bytes() ); - Neo4jPack.Unpacker unpacker = new Neo4jPack.Unpacker( input ); + Neo4jPack.Unpacker unpacker = neo4jPack.newUnpacker( input ); unpacker.unpack(); // Then @@ -165,16 +164,16 @@ public void shouldErrorOnUnpackingMapWithDuplicateKeys() throws IOException { // Given PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); packer.packMapHeader( 2 ); packer.pack( "key" ); - packer.pack( 1 ); + packer.pack( intValue( 1 ) ); packer.pack( "key" ); - packer.pack( 2 ); + packer.pack( intValue( 2 ) ); // When PackedInputArray input = new PackedInputArray( output.bytes() ); - Neo4jPack.Unpacker unpacker = new Neo4jPack.Unpacker( input ); + Neo4jPack.Unpacker unpacker = neo4jPack.newUnpacker( input ); unpacker.unpack(); // Then @@ -217,7 +216,7 @@ public void shouldTreatSingleCharAsSingleCharacterString() throws IOException { // Given PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); packer.pack( charValue( 'C' ) ); AnyValue unpacked = unpacked( output.bytes() ); @@ -231,8 +230,8 @@ public void shouldTreatCharArrayAsListOfStrings() throws IOException { // Given PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); - packer.pack( Values.charArray( new char[]{'W', 'H', 'Y'} ) ); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); + packer.pack( charArray( new char[]{'W', 'H', 'Y'} ) ); Object unpacked = unpacked( output.bytes() ); // Then @@ -247,9 +246,9 @@ public void shouldPackUtf8() throws IOException // Given String value = "\uD83D\uDE31"; byte[] bytes = value.getBytes( StandardCharsets.UTF_8 ); - TextValue textValue = Values.utf8Value( bytes, 0, bytes.length ); + TextValue textValue = utf8Value( bytes, 0, bytes.length ); PackedOutputArray output = new PackedOutputArray(); - Neo4jPack.Packer packer = new Neo4jPack.Packer( output ); + Neo4jPack.Packer packer = neo4jPack.newPacker( output ); packer.pack( textValue ); // When diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/util/MessageMatchers.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/util/MessageMatchers.java index c0c3d161ef13..0c9af48c2476 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/util/MessageMatchers.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/util/MessageMatchers.java @@ -40,6 +40,7 @@ import org.neo4j.bolt.v1.messaging.BoltResponseMessageRecorder; import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter; import org.neo4j.bolt.v1.messaging.Neo4jPack; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.messaging.RecordingByteChannel; import org.neo4j.bolt.v1.messaging.message.FailureMessage; import org.neo4j.bolt.v1.messaging.message.IgnoredMessage; @@ -50,13 +51,13 @@ import org.neo4j.bolt.v1.packstream.BufferedChannelInput; import org.neo4j.bolt.v1.packstream.BufferedChannelOutput; import org.neo4j.bolt.v1.transport.integration.TestNotification; -import org.neo4j.kernel.impl.util.BaseToObjectValueWriter; import org.neo4j.cypher.result.QueryResult; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Notification; import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.spatial.Point; import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.impl.util.BaseToObjectValueWriter; import org.neo4j.kernel.impl.util.HexPrinter; import org.neo4j.values.AnyValue; import org.neo4j.values.storable.CoordinateReferenceSystem; @@ -295,30 +296,33 @@ public void describeTo( Description description ) public static byte[] serialize( RequestMessage... messages ) throws IOException { - final RecordingByteChannel rawData = new RecordingByteChannel(); - final BoltRequestMessageWriter packer = new BoltRequestMessageWriter( new Neo4jPack.Packer( new - BufferedChannelOutput( rawData ) ), NO_BOUNDARY_HOOK ); + RecordingByteChannel rawData = new RecordingByteChannel(); + Neo4jPack neo4jPack = new Neo4jPackV1(); + Neo4jPack.Packer packer = neo4jPack.newPacker( new BufferedChannelOutput( rawData ) ); + BoltRequestMessageWriter writer = new BoltRequestMessageWriter( packer, NO_BOUNDARY_HOOK ); for ( RequestMessage message : messages ) { - packer.write( message ); + writer.write( message ); } - packer.flush(); + writer.flush(); return rawData.getBytes(); } public static byte[] serialize( ResponseMessage... messages ) throws IOException { - final RecordingByteChannel rawData = new RecordingByteChannel(); - final BoltResponseMessageWriter packer = new BoltResponseMessageWriter( new Neo4jPack.Packer( new - BufferedChannelOutput( rawData ) ), NO_BOUNDARY_HOOK, NullBoltMessageLogger.getInstance() ); + RecordingByteChannel rawData = new RecordingByteChannel(); + Neo4jPack neo4jPack = new Neo4jPackV1(); + Neo4jPack.Packer packer = neo4jPack.newPacker( new BufferedChannelOutput( rawData ) ); + BoltResponseMessageWriter writer = new BoltResponseMessageWriter( packer, NO_BOUNDARY_HOOK, + NullBoltMessageLogger.getInstance() ); for ( ResponseMessage message : messages ) { - message.dispatch( packer ); + message.dispatch( writer ); } - packer.flush(); + writer.flush(); return rawData.getBytes(); } @@ -370,14 +374,18 @@ public static ResponseMessage responseMessage( byte[] bytes ) throws IOException private static BoltRequestMessageReader requestReader( byte[] bytes ) { - return new BoltRequestMessageReader( - new Neo4jPack.Unpacker( new BufferedChannelInput( 128 ).reset( new ArrayByteChannel( bytes ) ) ) ); + BufferedChannelInput input = new BufferedChannelInput( 128 ); + input.reset( new ArrayByteChannel( bytes ) ); + Neo4jPack neo4jPack = new Neo4jPackV1(); + return new BoltRequestMessageReader( neo4jPack.newUnpacker( input ) ); } private static BoltResponseMessageReader responseReader( byte[] bytes ) { - return new BoltResponseMessageReader( - new Neo4jPack.Unpacker( new BufferedChannelInput( 128 ).reset( new ArrayByteChannel( bytes ) ) ) ); + BufferedChannelInput input = new BufferedChannelInput( 128 ); + input.reset( new ArrayByteChannel( bytes ) ); + Neo4jPack neo4jPack = new Neo4jPackV1(); + return new BoltResponseMessageReader( neo4jPack.newUnpacker( input ) ); } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java index fa1f69d39b62..f0a2c3e038e0 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolV1HandlerTest.java @@ -31,6 +31,7 @@ import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.transport.TransportThrottleGroup; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltStateMachine; import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; @@ -61,8 +62,9 @@ public void shouldNotTalkToChannelDirectlyOnFatalError() throws Throwable when( boltChannel.rawChannel() ).thenReturn( outputChannel ); BoltStateMachine machine = mock( BoltStateMachine.class ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( - boltChannel, new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); + BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, new Neo4jPackV1(), + new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, + NullLogService.getInstance() ); verify( outputChannel ).alloc(); // And given inbound data that'll explode when the protocol tries to interpret it @@ -94,8 +96,9 @@ public void closesInputAndOutput() BoltChannel boltChannel = mock( BoltChannel.class ); when( boltChannel.rawChannel() ).thenReturn( outputChannel ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( - boltChannel, new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); + BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, new Neo4jPackV1(), + new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, + NullLogService.getInstance() ); protocol.close(); verify( machine ).close(); @@ -115,8 +118,8 @@ public void messageProcessingErrorIsLogged() throws IOException BoltChannel boltChannel = mock( BoltChannel.class ); when( boltChannel.rawChannel() ).thenReturn( outputChannel ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( - boltChannel, mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE, logService ); + BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, new Neo4jPackV1(), + mock( BoltWorker.class ), TransportThrottleGroup.NO_THROTTLE, logService ); protocol.handle( mock( ChannelHandlerContext.class ), data ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltV1DechunkerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltV1DechunkerTest.java index 6beb7ed8cffd..7afb602584e0 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltV1DechunkerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/BoltV1DechunkerTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.neo4j.bolt.v1.messaging.BoltRequestMessageRecorder; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.messaging.message.RunMessage; import org.neo4j.bolt.v1.messaging.util.MessageMatchers; @@ -56,7 +57,7 @@ public void shouldReadMessageWhenTheHeaderIsSplitAcrossChunks() throws Exception System.arraycopy( message, 0, chunk2, 1, message.length ); BoltRequestMessageRecorder messages = new BoltRequestMessageRecorder(); - BoltV1Dechunker dechunker = new BoltV1Dechunker( messages, () -> + BoltV1Dechunker dechunker = new BoltV1Dechunker( new Neo4jPackV1(), messages, () -> { } ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java index d19ebd518ea0..a7e42f9547db 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java @@ -32,7 +32,7 @@ import org.neo4j.bolt.logging.NullBoltMessageLogger; import org.neo4j.bolt.transport.TransportThrottleGroup; import org.neo4j.bolt.v1.messaging.BoltRequestMessageWriter; -import org.neo4j.bolt.v1.messaging.Neo4jPack; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.messaging.RecordingByteChannel; import org.neo4j.bolt.v1.messaging.message.RequestMessage; import org.neo4j.bolt.v1.messaging.message.RunMessage; @@ -124,8 +124,9 @@ private void testPermutation( byte[] unfragmented, ByteBuf[] fragments ) throws when( boltChannel.rawChannel() ).thenReturn( ch ); when( boltChannel.log() ).thenReturn( NullBoltMessageLogger.getInstance() ); - BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( - boltChannel, new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); + BoltMessagingProtocolV1Handler protocol = new BoltMessagingProtocolV1Handler( boltChannel, new Neo4jPackV1(), + new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, + NullLogService.getInstance() ); // When data arrives split up according to the current permutation for ( ByteBuf fragment : fragments ) @@ -176,7 +177,7 @@ private byte[] serialize( int chunkSize, RequestMessage... msgs ) throws IOExcep RecordingByteChannel channel = new RecordingByteChannel(); BoltRequestMessageWriter writer = new BoltRequestMessageWriter( - new Neo4jPack.Packer( new BufferedChannelOutput( channel ) ), NO_BOUNDARY_HOOK ); + new Neo4jPackV1().newPacker( new BufferedChannelOutput( channel ) ), NO_BOUNDARY_HOOK ); writer.write( msgs[i] ).flush(); serialized[i] = channel.getBytes(); } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java index c05d68a7700e..6efa349ab5a4 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java @@ -31,6 +31,7 @@ import org.neo4j.bolt.transport.BoltProtocolHandlerFactory; import org.neo4j.bolt.transport.SocketTransportHandler; import org.neo4j.bolt.transport.TransportThrottleGroup; +import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltStateMachine; import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler; @@ -203,7 +204,7 @@ private static BoltHandshakeProtocolHandler newHandshakeHandler( BoltStateMachin BoltProtocolHandlerFactory handlerFactory = ( version, channel ) -> { assertEquals( 1, version ); - return new BoltMessagingProtocolV1Handler( channel, new SynchronousBoltWorker( machine ), + return new BoltMessagingProtocolV1Handler( channel, new Neo4jPackV1(), new SynchronousBoltWorker( machine ), TransportThrottleGroup.NO_THROTTLE, NullLogService.getInstance() ); };