Skip to content

Commit

Permalink
Better parameter handling in BOLT
Browse files Browse the repository at this point in the history
When reading data from the wire Bolt has full knowledge of types
and can thus create a MapValue directly without Cypher needing to
do the translation of `Object` -> `AnyValue`.
  • Loading branch information
pontusmelke committed Sep 14, 2017
1 parent 18993a8 commit 1a242dd
Show file tree
Hide file tree
Showing 145 changed files with 631 additions and 365 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.cypher.result.QueryResult;
import org.neo4j.logging.Log;
import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.MapValue;

/**
* This class is responsible for routing incoming request messages to a worker
Expand Down Expand Up @@ -86,7 +87,7 @@ public void onReset() throws RuntimeException
}

@Override
public void onRun( String statement, Map<String,Object> params )
public void onRun( String statement, MapValue params )
{
messageLogger.logRun( statement, () -> params );
worker.enqueue( session -> session.run( statement, params, runHandler ) );
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.values.virtual.MapValue;

/**
* Interface defining simple handler methods for each defined
Expand All @@ -37,7 +38,7 @@ public interface BoltRequestMessageHandler<E extends Exception>

void onReset() throws E;

void onRun( String statement, Map<String,Object> params ) throws E;
void onRun( String statement, MapValue params ) throws E;

void onDiscardAll() throws E;

Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.bolt.v1.packstream.PackStream;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.values.virtual.MapValue;

/**
* Reader for Bolt request messages made available via a {@link Neo4jPack.Unpacker}.
Expand Down Expand Up @@ -74,7 +75,7 @@ public <E extends Exception> void read( BoltRequestMessageHandler<E> handler ) t
break;
case RUN:
String statement = unpacker.unpackString();
Map<String,Object> params = unpacker.unpackToRawMap();
MapValue params = unpacker.unpackMap();
Optional<Neo4jError> error = unpacker.consumeError();
if ( error.isPresent() )
{
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.bolt.v1.packstream.PackType;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.collection.primitive.PrimitiveLongIntKeyValueArray;
import org.neo4j.cypher.internal.javacompat.BaseToObjectValueWriter;
import org.neo4j.helpers.BaseToObjectValueWriter;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.spatial.Point;
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;

import static org.neo4j.kernel.api.security.AuthToken.PRINCIPAL;
import static org.neo4j.values.storable.Values.stringArray;
Expand Down Expand Up @@ -189,7 +190,7 @@ public void reset( BoltResponseHandler handler ) throws BoltConnectionFatality
* {@link #pullAll(BoltResponseHandler) pulled} or {@link #discardAll(BoltResponseHandler)
* discarded}.
*/
public void run( String statement, Map<String,Object> params, BoltResponseHandler handler )
public void run( String statement, MapValue params, BoltResponseHandler handler )
throws BoltConnectionFatality
{
long start = clock.millis();
Expand Down Expand Up @@ -389,7 +390,7 @@ public State init( BoltStateMachine machine, String userAgent,
{
@Override
public State run( BoltStateMachine machine, String statement,
Map<String,Object> params ) throws BoltConnectionFatality
MapValue params ) throws BoltConnectionFatality
{
try
{
Expand Down Expand Up @@ -516,7 +517,7 @@ public State ackFailure( BoltStateMachine machine ) throws BoltConnectionFatalit

@Override
public State run( BoltStateMachine machine, String statement,
Map<String,Object> params )
MapValue params )
{
machine.ctx.markIgnored();
return FAILED;
Expand Down Expand Up @@ -570,8 +571,7 @@ public State ackFailure( BoltStateMachine machine ) throws BoltConnectionFatalit
}

@Override
public State run( BoltStateMachine machine, String statement, Map<String,Object>
params ) throws BoltConnectionFatality
public State run( BoltStateMachine machine, String statement, MapValue params ) throws BoltConnectionFatality
{
machine.ctx.markIgnored();
return INTERRUPTED;
Expand Down Expand Up @@ -623,7 +623,7 @@ public State reset( BoltStateMachine machine ) throws BoltConnectionFatality
throw new BoltProtocolBreachFatality( msg );
}

public State run( BoltStateMachine machine, String statement, Map<String,Object> params ) throws
public State run( BoltStateMachine machine, String statement, MapValue params ) throws
BoltConnectionFatality
{
String msg = "RUN cannot be handled by a session in the " + name() + " state.";
Expand Down Expand Up @@ -801,7 +801,7 @@ interface SPI
private static class NullStatementProcessor implements StatementProcessor
{
@Override
public StatementMetadata run( String statement, Map<String,Object> params ) throws KernelException
public StatementMetadata run( String statement, MapValue params ) throws KernelException
{
throw new UnsupportedOperationException( "Unable to run any statements." );
}
Expand Down
Expand Up @@ -24,8 +24,8 @@
import java.util.List;
import java.util.Map;

import org.neo4j.cypher.internal.javacompat.ValueUtils;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.helpers.ValueUtils;
import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.ListValue;
import org.neo4j.values.virtual.MapValue;
Expand Down
Expand Up @@ -23,12 +23,11 @@
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;

import java.util.Map;
import org.neo4j.values.virtual.MapValue;

public interface StatementProcessor
{
StatementMetadata run( String statement, Map<String, Object> params ) throws KernelException;
StatementMetadata run( String statement, MapValue params ) throws KernelException;

void streamResult( ThrowingConsumer<BoltResult, Exception> resultConsumer ) throws Exception;

Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.bolt.v1.runtime;

import java.time.Clock;
import java.util.Map;
import java.util.regex.Pattern;

import org.neo4j.bolt.security.auth.AuthenticationResult;
Expand All @@ -36,6 +35,7 @@
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.SecurityContext;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
import org.neo4j.values.virtual.MapValue;

import static org.neo4j.function.ThrowingAction.noop;

Expand Down Expand Up @@ -69,7 +69,7 @@ private void before()
}

@Override
public StatementMetadata run( String statement, Map<String, Object> params ) throws KernelException
public StatementMetadata run( String statement, MapValue params ) throws KernelException
{
before();
try
Expand Down Expand Up @@ -153,7 +153,7 @@ enum State
{
@Override
State run( MutableTransactionState ctx, SPI spi, String statement,
Map<String, Object> params ) throws KernelException
MapValue params ) throws KernelException

{
if ( BEGIN.matcher( statement ).matches() )
Expand Down Expand Up @@ -218,7 +218,7 @@ else if ( ROLLBACK.matcher( statement ).matches() )
* transaction to null.
*/
private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params )
String statement, MapValue params )
throws TransactionFailureException, QueryExecutionKernelException
{
return executeQuery( ctx, spi, statement, params, () ->
Expand All @@ -240,7 +240,7 @@ void streamResult( MutableTransactionState ctx,
EXPLICIT_TRANSACTION
{
@Override
State run( MutableTransactionState ctx, SPI spi, String statement, Map<String, Object> params )
State run( MutableTransactionState ctx, SPI spi, String statement, MapValue params )
throws KernelException
{
if ( BEGIN.matcher( statement ).matches() )
Expand Down Expand Up @@ -289,7 +289,7 @@ else if ( ROLLBACK.matcher( statement ).matches() )
}

private BoltResultHandle execute( MutableTransactionState ctx, SPI spi,
String statement, Map<String,Object> params )
String statement, MapValue params )
throws QueryExecutionKernelException
{
return executeQuery( ctx, spi, statement, params,
Expand All @@ -315,7 +315,7 @@ void streamResult( MutableTransactionState ctx,
abstract State run( MutableTransactionState ctx,
SPI spi,
String statement,
Map<String, Object> params ) throws KernelException;
MapValue params ) throws KernelException;

abstract void streamResult( MutableTransactionState ctx,
ThrowingConsumer<BoltResult, Exception> resultConsumer ) throws Exception;
Expand Down Expand Up @@ -370,7 +370,7 @@ void closeTransaction( MutableTransactionState ctx, boolean success ) throws Tra
}

private static BoltResultHandle executeQuery( MutableTransactionState ctx, SPI spi, String statement,
Map<String,Object> params, ThrowingAction<KernelException> onFail )
MapValue params, ThrowingAction<KernelException> onFail )
throws QueryExecutionKernelException
{
return spi.executeQuery( ctx.querySource, ctx.securityContext, statement, params, onFail );
Expand Down Expand Up @@ -440,7 +440,7 @@ interface SPI
BoltResultHandle executeQuery( BoltQuerySource querySource,
SecurityContext securityContext,
String statement,
Map<String,Object> params,
MapValue params,
ThrowingAction<KernelException> onFail ) throws QueryExecutionKernelException;
}
}
Expand Up @@ -21,7 +21,6 @@

import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.function.Supplier;

import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle;
Expand All @@ -48,6 +47,7 @@
import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.values.virtual.MapValue;

import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;

Expand Down Expand Up @@ -125,7 +125,7 @@ public boolean isPeriodicCommit( String query )
public BoltResultHandle executeQuery( BoltQuerySource querySource,
SecurityContext securityContext,
String statement,
Map<String,Object> params, ThrowingAction<KernelException> onFail ) throws QueryExecutionKernelException
MapValue params, ThrowingAction<KernelException> onFail ) throws QueryExecutionKernelException
{
InternalTransaction internalTransaction = queryService.beginTransaction( implicit, securityContext );
ClientConnectionInfo sourceDetails = new BoltConnectionInfo( querySource.principalName,
Expand Down
Expand Up @@ -19,11 +19,13 @@
*/
package org.neo4j.bolt.v1.runtime.bookmarking;

import java.util.List;
import java.util.Map;

import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.TextValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.ListValue;
import org.neo4j.values.virtual.MapValue;

import static java.lang.String.format;

Expand All @@ -46,7 +48,7 @@ public String toString()
return format( BOOKMARK_TX_PREFIX + "%d", txId );
}

public static Bookmark fromParamsOrNull( Map<String,Object> params ) throws BookmarkFormatException
public static Bookmark fromParamsOrNull( MapValue params ) throws BookmarkFormatException
{
// try to parse multiple bookmarks, if available
Bookmark bookmark = parseMultipleBookmarks( params );
Expand All @@ -64,24 +66,24 @@ public long txId()
return txId;
}

private static Bookmark parseMultipleBookmarks( Map<String,Object> params ) throws BookmarkFormatException
private static Bookmark parseMultipleBookmarks( MapValue params ) throws BookmarkFormatException
{
Object bookmarksObject = params.get( BOOKMARKS_KEY );
AnyValue bookmarksObject = params.get( BOOKMARKS_KEY );

if ( bookmarksObject == null )
if ( bookmarksObject == Values.NO_VALUE )
{
return null;
}
else if ( bookmarksObject instanceof List )
else if ( bookmarksObject instanceof ListValue )
{
List<?> bookmarks = (List<?>) bookmarksObject;
ListValue bookmarks = (ListValue) bookmarksObject;

long maxTxId = -1;
for ( Object bookmark : bookmarks )
for ( AnyValue bookmark : bookmarks )
{
if ( bookmark != null )
if ( bookmark != Values.NO_VALUE )
{
long txId = txIdFrom( bookmark.toString() );
long txId = txIdFrom( bookmark );
if ( txId > maxTxId )
{
maxTxId = txId;
Expand All @@ -96,20 +98,24 @@ else if ( bookmarksObject instanceof List )
}
}

private static Bookmark parseSingleBookmark( Map<String,Object> params ) throws BookmarkFormatException
private static Bookmark parseSingleBookmark( MapValue params ) throws BookmarkFormatException
{
Object bookmarkObject = params.get( BOOKMARK_KEY );
if ( bookmarkObject == null )
AnyValue bookmarkObject = params.get( BOOKMARK_KEY );
if ( bookmarkObject == Values.NO_VALUE )
{
return null;
}

String bookmarkString = bookmarkObject.toString();
return new Bookmark( txIdFrom( bookmarkString ) );
return new Bookmark( txIdFrom( bookmarkObject ) );
}

private static long txIdFrom( String bookmarkString ) throws BookmarkFormatException
private static long txIdFrom( AnyValue bookmark ) throws BookmarkFormatException
{
if ( !(bookmark instanceof TextValue) )
{
throw new BookmarkFormatException( bookmark );
}
String bookmarkString = ((TextValue) bookmark).stringValue();
if ( !bookmarkString.startsWith( BOOKMARK_TX_PREFIX ) )
{
throw new BookmarkFormatException( bookmarkString );
Expand Down
Expand Up @@ -30,10 +30,10 @@
import org.neo4j.bolt.v1.runtime.BoltConnectionFatality;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.runtime.StatementProcessor;
import org.neo4j.cypher.internal.javacompat.ValueUtils;
import org.neo4j.cypher.result.QueryResult;
import org.neo4j.function.ThrowingAction;
import org.neo4j.function.ThrowingBiConsumer;
import org.neo4j.helpers.ValueUtils;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.TextValue;
Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.bolt.v1.messaging.message.RequestMessage;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.values.virtual.MapValue;

import static org.neo4j.bolt.v1.messaging.message.AckFailureMessage.ackFailure;
import static org.neo4j.bolt.v1.messaging.message.DiscardAllMessage.discardAll;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void onReset() throws RuntimeException
}

@Override
public void onRun( String statement, Map<String, Object> params )
public void onRun( String statement, MapValue params )
{
messages.add( run( statement, params ) );
}
Expand Down

0 comments on commit 1a242dd

Please sign in to comment.