Skip to content

Commit

Permalink
Small cleanup around Neo4jPack
Browse files Browse the repository at this point in the history
Removed unneeded and unused methods and unified exception propagation
between Packer and Unpacker.
  • Loading branch information
lutovich committed Jan 9, 2018
1 parent 8f9e85d commit 072f77b
Show file tree
Hide file tree
Showing 17 changed files with 248 additions and 179 deletions.
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.messaging;

import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.spatial.Point;
import org.neo4j.kernel.impl.util.BaseToObjectValueWriter;
import org.neo4j.values.AnyValue;
import org.neo4j.values.AnyValueWriter;
import org.neo4j.values.storable.CoordinateReferenceSystem;

/**
* {@link AnyValueWriter Writer} that allows to convert {@link AnyValue} to any primitive Java type. It explicitly
* prohibits conversion of nodes, relationships and points. They are not expected in auth token map.
*/
class AuthTokenValuesWriter extends BaseToObjectValueWriter<RuntimeException>
{
Object valueAsObject( AnyValue value )
{
value.writeTo( this );
return value();
}

@Override
protected Node newNodeProxyById( long id )
{
throw new UnsupportedOperationException( "INIT message metadata should not contain nodes" );
}

@Override
protected Relationship newRelationshipProxyById( long id )
{
throw new UnsupportedOperationException( "INIT message metadata should not contain relationships" );
}

@Override
protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate )
{
throw new UnsupportedOperationException( "INIT message metadata should not contain points" );
}
}
Expand Up @@ -21,13 +21,14 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import org.neo4j.bolt.v1.packstream.PackStream;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.values.virtual.MapValue;

import static java.util.stream.Collectors.toMap;

/**
* Reader for Bolt request messages made available via a {@link Neo4jPack.Unpacker}.
*/
Expand All @@ -40,11 +41,6 @@ public BoltRequestMessageReader( Neo4jPack.Unpacker unpacker )
this.unpacker = unpacker;
}

public boolean hasNext() throws IOException
{
return unpacker.hasNext();
}

/**
* Parse and handle a single message by handing it off
* to a {@link BoltRequestMessageHandler} instance.
Expand All @@ -64,8 +60,8 @@ public <E extends Exception> void read( BoltRequestMessageHandler<E> handler ) t
{
case INIT:
String clientName = unpacker.unpackString();
Map<String,Object> credentials = unpacker.unpackToRawMap();
handler.onInit( clientName, credentials );
Map<String,Object> authToken = readAuthToken( unpacker );
handler.onInit( clientName, authToken );
break;
case ACK_FAILURE:
handler.onAckFailure();
Expand All @@ -76,10 +72,10 @@ public <E extends Exception> void read( BoltRequestMessageHandler<E> handler ) t
case RUN:
String statement = unpacker.unpackString();
MapValue params = unpacker.unpackMap();
Optional<Neo4jError> error = unpacker.consumeError();
if ( error.isPresent() )
Neo4jError error = unpacker.consumeError();
if ( error != null )
{
handler.onExternalError( error.get() );
handler.onExternalError( error );
}
else
{
Expand Down Expand Up @@ -109,4 +105,13 @@ public <E extends Exception> void read( BoltRequestMessageHandler<E> handler ) t
"Error was: " + e.getMessage(), e );
}
}

private static Map<String,Object> readAuthToken( Neo4jPack.Unpacker unpacker ) throws IOException
{
MapValue authTokenValue = unpacker.unpackMap();
AuthTokenValuesWriter writer = new AuthTokenValuesWriter();
return authTokenValue.entrySet()
.stream()
.collect( toMap( Map.Entry::getKey, entry -> writer.valueAsObject( entry.getValue() ) ) );
}
}
Expand Up @@ -73,15 +73,19 @@ public void onRecord( QueryResult.Record item ) throws IOException
//The record might contain unpackable values,
//hence we must consume any errors that might
//have occurred.
packer.consumeError(); // TODO: find a better way
IOException error = packer.consumeError();
if ( error != null )
{
throw error;
}
}

@Override
public void onSuccess( MapValue metadata ) throws IOException
{
messageLogger.logSuccess( () -> metadata );
packer.packStructHeader( 1, SUCCESS.signature() );
packer.packRawMap( metadata );
packer.pack( metadata );
onMessageComplete.onMessageComplete();
}

Expand Down
Expand Up @@ -20,55 +20,51 @@
package org.neo4j.bolt.v1.messaging;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import org.neo4j.bolt.v1.packstream.PackInput;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.MapValue;

/**
* Represents a single Bolt message format by exposing a {@link Packer packer} and {@link Unpacker unpacker}
* for primitives of this format.
*/
public interface Neo4jPack
{
interface Packer
{
void packStructHeader( int size, byte signature ) throws IOException;
void pack( String value ) throws IOException;

void pack( AnyValue value ) throws IOException;

void packRawMap( MapValue map ) throws IOException;
void packStructHeader( int size, byte signature ) throws IOException;

void packMapHeader( int size ) throws IOException;

void flush() throws IOException;

void pack( String value ) throws IOException;

void packListHeader( int size ) throws IOException;

void consumeError() throws BoltIOException;
IOException consumeError();

void flush() throws IOException;
}

interface Unpacker
{
boolean hasNext() throws IOException;

long unpackStructHeader() throws IOException;

char unpackStructSignature() throws IOException;
AnyValue unpack() throws IOException;

String unpackString() throws IOException;

Map<String,Object> unpackToRawMap() throws IOException;

MapValue unpackMap() throws IOException;

long unpackListHeader() throws IOException;
long unpackStructHeader() throws IOException;

AnyValue unpack() throws IOException;
char unpackStructSignature() throws IOException;

long unpackListHeader() throws IOException;

Optional<Neo4jError> consumeError();
Neo4jError consumeError();
}

Packer newPacker( PackOutput output );
Expand Down
Expand Up @@ -24,25 +24,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.neo4j.bolt.v1.packstream.PackInput;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.bolt.v1.packstream.PackStream;
import org.neo4j.bolt.v1.packstream.PackType;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.collection.primitive.PrimitiveLongIntKeyValueArray;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.spatial.Point;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.util.BaseToObjectValueWriter;
import org.neo4j.values.AnyValue;
import org.neo4j.values.AnyValueWriter;
import org.neo4j.values.storable.CoordinateReferenceSystem;
import org.neo4j.values.storable.TextArray;
import org.neo4j.values.storable.TextValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.storable.CoordinateReferenceSystem;
import org.neo4j.values.virtual.EdgeValue;
import org.neo4j.values.virtual.ListValue;
import org.neo4j.values.virtual.MapValue;
Expand Down Expand Up @@ -97,25 +92,15 @@ public void pack( AnyValue value ) throws IOException
}

@Override
public void packRawMap( MapValue map ) throws IOException
{
packMapHeader( map.size() );
for ( Map.Entry<String,AnyValue> entry : map.entrySet() )
{
pack( entry.getKey() );
pack( entry.getValue() );
}
}

@Override
public void consumeError() throws BoltIOException
public IOException consumeError()
{
if ( error != null )
{
BoltIOException exception = new BoltIOException( error.status(), error.msg() );
IOException exception = new BoltIOException( error.status(), error.msg() );
error = null;
throw exception;
return exception;
}
return null;
}

@Override
Expand Down Expand Up @@ -592,32 +577,11 @@ public MapValue unpackMap() throws IOException
}

@Override
public Map<String,Object> unpackToRawMap() throws IOException
public Neo4jError consumeError()
{
MapValue mapValue = unpackMap();
HashMap<String,Object> map = new HashMap<>( mapValue.size() );
for ( Map.Entry<String,AnyValue> entry : mapValue.entrySet() )
{
UnpackerWriter unpackerWriter = new UnpackerWriter();
entry.getValue().writeTo( unpackerWriter );
map.put( entry.getKey(), unpackerWriter.value() );
}
return map;
}

@Override
public Optional<Neo4jError> consumeError()
{
if ( errors.isEmpty() )
{
return Optional.empty();
}
else
{
Neo4jError combined = Neo4jError.combine( errors );
errors.clear();
return Optional.of( combined );
}
Neo4jError error = Neo4jError.combine( errors );
errors.clear();
return error;
}
}

Expand All @@ -642,26 +606,4 @@ String msg()
return msg;
}
}

private static class UnpackerWriter extends BaseToObjectValueWriter<RuntimeException>
{

@Override
protected Node newNodeProxyById( long id )
{
throw new UnsupportedOperationException( "Cannot unpack nodes" );
}

@Override
protected Relationship newRelationshipProxyById( long id )
{
throw new UnsupportedOperationException( "Cannot unpack relationships" );
}

@Override
protected Point newPoint( CoordinateReferenceSystem crs, double[] coordinate )
{
throw new UnsupportedOperationException( "Cannot unpack points" );
}
}
}
Expand Up @@ -27,9 +27,6 @@
*/
public interface PackInput
{
/** True if there is at least one more consumable byte */
boolean hasMoreData() throws IOException;

/** Consume one byte */
byte readByte() throws IOException;

Expand Down
Expand Up @@ -471,11 +471,6 @@ public Unpacker( PackInput in )
this.in = in;
}

public boolean hasNext() throws IOException
{
return in.hasMoreData();
}

// TODO: This currently returns the number of fields in the struct. In 99% of cases we will look at the struct
// signature to determine how to read it, suggest we make that what we return here,
// and have the number of fields available through some alternate optional mechanism.
Expand Down
Expand Up @@ -34,12 +34,6 @@ public PackedInputArray( byte[] bytes )
this.data = new DataInputStream( this.bytes );
}

@Override
public boolean hasMoreData() throws IOException
{
return data.available() >= 1;
}

@Override
public byte readByte() throws IOException
{
Expand Down
Expand Up @@ -192,9 +192,11 @@ public static Neo4jError from( Throwable any )

public static Neo4jError combine( List<Neo4jError> errors )
{
assert errors.size() >= 1;

if ( errors.size() == 1 )
if ( errors == null || errors.isEmpty() )
{
return null;
}
else if ( errors.size() == 1 )
{
return errors.get( 0 );
}
Expand All @@ -214,7 +216,7 @@ public static Neo4jError combine( List<Neo4jError> errors )
.append( error.message );
}

return from(combinedStatus, combinedMessage.toString());
return from( combinedStatus, combinedMessage.toString() );
}
}

Expand Down

0 comments on commit 072f77b

Please sign in to comment.