Skip to content

Commit

Permalink
Add Bookmark facility to Bolt server.
Browse files Browse the repository at this point in the history
Bookmarks encode a transaction id.

We use them to support Causal Consistency, by supplying a bookmark
generated on one server to a subsquent transaction on a second
server in the same cluster. The second server will ensure that the
subsequent transaction sees a state at least as up to date as
the state seen on the first server, or throw an exception in case
of timeout.
  • Loading branch information
technige authored and apcj committed Aug 25, 2016
1 parent 1ff24a0 commit 2f57d8f
Show file tree
Hide file tree
Showing 34 changed files with 984 additions and 508 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.Neo4jError; import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.Record; import org.neo4j.bolt.v1.runtime.spi.Record;
import org.neo4j.bolt.v1.runtime.spi.RecordStream; import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import java.io.IOException; import java.io.IOException;
Expand All @@ -51,7 +51,7 @@ public BoltMessageRouter( Log log, BoltWorker worker, BoltResponseMessageHandler
{ {
this.initHandler = new InitHandler( output, onEachCompletedRequest, log ); this.initHandler = new InitHandler( output, onEachCompletedRequest, log );
this.runHandler = new RunHandler( output, onEachCompletedRequest, log ); this.runHandler = new RunHandler( output, onEachCompletedRequest, log );
this.pullAllHandler = new PullAllHandler( output, onEachCompletedRequest, log ); this.pullAllHandler = new ResultHandler( output, onEachCompletedRequest, log );
this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, log ); this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, log );


this.worker = worker; this.worker = worker;
Expand Down Expand Up @@ -139,15 +139,15 @@ public void onStart()
} }


@Override @Override
public void addRecords( RecordStream records ) throws Exception public void onRecords( BoltResult result, boolean pull ) throws Exception
{ {
// Overridden if records are returned, therefore // Overridden if records are returned, therefore
// should fail if called but not overridden. // should fail if called but not overridden.
assert false; assert false;
} }


@Override @Override
public void addMetadata( String key, Object value ) public void onMetadata( String key, Object value )
{ {
metadata.put( key, value ); metadata.put( key, value );
} }
Expand Down Expand Up @@ -224,23 +224,25 @@ private static class RunHandler extends MessageProcessingHandler
} }


} }

private static class ResultHandler extends MessageProcessingHandler
private static class PullAllHandler extends MessageProcessingHandler
{ {
PullAllHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, Log log ) ResultHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, Log log )
{ {
super( handler, onCompleted, log ); super( handler, onCompleted, log );
} }


@Override @Override
public void addRecords( RecordStream stream ) throws Exception public void onRecords( final BoltResult result, final boolean pull ) throws Exception
{ {
stream.accept( new RecordStream.Visitor() result.accept( new BoltResult.Visitor()
{ {
@Override @Override
public void visit( Record record ) throws Exception public void visit( Record record ) throws Exception
{ {
handler.onRecord( record ); if ( pull )
{
handler.onRecord( record );
}
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/ */
package org.neo4j.bolt.v1.runtime; package org.neo4j.bolt.v1.runtime;


import org.neo4j.bolt.v1.runtime.spi.RecordStream; import org.neo4j.bolt.v1.runtime.spi.BoltResult;


/** /**
* Callback for handling the result of requests. For a given session, callbacks will be invoked serially, * Callback for handling the result of requests. For a given session, callbacks will be invoked serially,
Expand All @@ -31,9 +31,9 @@ public interface BoltResponseHandler
/** Called exactly once, before the request is processed by the Session State Machine */ /** Called exactly once, before the request is processed by the Session State Machine */
void onStart(); void onStart();


void addRecords( RecordStream records ) throws Exception; void onRecords( BoltResult result, boolean pull ) throws Exception;


void addMetadata( String key, Object value ); void onMetadata( String key, Object value );


/** Called when the state machine ignores an operation, because it is waiting for an error to be acknowledged */ /** Called when the state machine ignores an operation, because it is waiting for an error to be acknowledged */
void markIgnored(); void markIgnored();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.bolt.security.auth.AuthenticationResult; import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.v1.runtime.cypher.StatementMetadata; import org.neo4j.bolt.v1.runtime.cypher.StatementMetadata;
import org.neo4j.bolt.v1.runtime.cypher.StatementProcessor; import org.neo4j.bolt.v1.runtime.cypher.StatementProcessor;
import org.neo4j.bolt.v1.runtime.spi.RecordStream; import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.function.ThrowingConsumer; import org.neo4j.function.ThrowingConsumer;
import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine; import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine;
import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.KernelException;
Expand All @@ -52,7 +52,7 @@
* (i.e. a message sent out of sequence) will result in an immediate failure * (i.e. a message sent out of sequence) will result in an immediate failure
* response and a closed connection. * response and a closed connection.
*/ */
public class BoltStateMachine implements ManagedBoltStateMachine public class BoltStateMachine implements AutoCloseable, ManagedBoltStateMachine
{ {
private final String id = UUID.randomUUID().toString(); private final String id = UUID.randomUUID().toString();
private final Runnable onClose; private final Runnable onClose;
Expand Down Expand Up @@ -320,7 +320,7 @@ public State init( BoltStateMachine machine, String userAgent,
machine.ctx.init( authResult ); machine.ctx.init( authResult );
if ( authResult.credentialsExpired() ) if ( authResult.credentialsExpired() )
{ {
machine.ctx.addMetadata( "credentials_expired", true ); machine.ctx.onMetadata( "credentials_expired", true );
} }
machine.spi.udcRegisterClient( userAgent ); machine.spi.udcRegisterClient( userAgent );
if ( authToken.containsKey( PRINCIPAL ) ) if ( authToken.containsKey( PRINCIPAL ) )
Expand Down Expand Up @@ -365,7 +365,7 @@ public State run( BoltStateMachine machine, String statement,
try try
{ {
StatementMetadata statementMetadata = machine.ctx.statementProcessor.run( statement, params ); StatementMetadata statementMetadata = machine.ctx.statementProcessor.run( statement, params );
machine.ctx.addMetadata( "fields", statementMetadata.fieldNames() ); machine.ctx.onMetadata( "fields", statementMetadata.fieldNames() );
return STREAMING; return STREAMING;
} }
catch ( Throwable e ) catch ( Throwable e )
Expand Down Expand Up @@ -431,7 +431,7 @@ public State pullAll( BoltStateMachine machine )
try try
{ {
machine.ctx.statementProcessor.streamResult( recordStream -> { machine.ctx.statementProcessor.streamResult( recordStream -> {
machine.ctx.responseHandler.addRecords( recordStream ); machine.ctx.responseHandler.onRecords( recordStream, true );
} ); } );
return READY; return READY;
} }
Expand All @@ -448,7 +448,7 @@ public State discardAll( BoltStateMachine machine )
try try
{ {
machine.ctx.statementProcessor.streamResult( recordStream -> { machine.ctx.statementProcessor.streamResult( recordStream -> {
// discard records machine.ctx.responseHandler.onRecords( recordStream, false );
} ); } );


return READY; return READY;
Expand Down Expand Up @@ -702,20 +702,20 @@ public void onStart()
} }
} }


public void addRecords( RecordStream record ) throws Exception public void onRecords( BoltResult result, boolean pull ) throws Exception
{ {
if ( responseHandler != null ) if ( responseHandler != null )
{ {
responseHandler.addRecords( record ); responseHandler.onRecords( result, pull );
} }
} }


@Override @Override
public void addMetadata( String key, Object value ) public void onMetadata( String key, Object value )
{ {
if ( responseHandler != null ) if ( responseHandler != null )
{ {
responseHandler.addMetadata( key, value ); responseHandler.onMetadata( key, value );
} }
} }


Expand Down Expand Up @@ -774,7 +774,7 @@ public StatementMetadata run( String statement, Map<String, Object> params ) thr
} }


@Override @Override
public void streamResult( ThrowingConsumer<RecordStream, Exception> resultConsumer ) throws Exception public void streamResult( ThrowingConsumer<BoltResult, Exception> resultConsumer ) throws Exception
{ {
throw new UnsupportedOperationException( "Unable to stream any results." ); throw new UnsupportedOperationException( "Unable to stream any results." );
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,18 +24,8 @@
import org.neo4j.bolt.security.auth.Authentication; import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.security.auth.AuthenticationException; import org.neo4j.bolt.security.auth.AuthenticationException;
import org.neo4j.bolt.security.auth.AuthenticationResult; import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker; import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.security.AuthSubject;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;
import org.neo4j.udc.UsageDataKeys; import org.neo4j.udc.UsageDataKeys;


Expand All @@ -46,69 +36,21 @@ class BoltStateMachineSPI implements BoltStateMachine.SPI
private final ErrorReporter errorReporter; private final ErrorReporter errorReporter;
private final BoltConnectionTracker connectionTracker; private final BoltConnectionTracker connectionTracker;
private final Authentication authentication; private final Authentication authentication;

private final TransactionStateMachine.SPI transactionSpi;
final TransactionStateMachine.SPI transactionSpi;


BoltStateMachineSPI( String connectionDescriptor, BoltStateMachineSPI( String connectionDescriptor,
UsageData usageData, UsageData usageData,
GraphDatabaseAPI db,
QueryExecutionEngine queryExecutionEngine,
LogService logging, LogService logging,
Authentication authentication, Authentication authentication,
ThreadToStatementContextBridge txBridge, BoltConnectionTracker connectionTracker,
StatementRunner statementRunner, TransactionStateMachine.SPI transactionStateMachineSPI )
BoltConnectionTracker connectionTracker )
{ {
this.connectionDescriptor = connectionDescriptor; this.connectionDescriptor = connectionDescriptor;
this.usageData = usageData; this.usageData = usageData;
this.errorReporter = new ErrorReporter( logging ); this.errorReporter = new ErrorReporter( logging );
this.connectionTracker = connectionTracker; this.connectionTracker = connectionTracker;
Log log = logging.getInternalLog( BoltStateMachine.class );
this.authentication = authentication; this.authentication = authentication;
this.transactionSpi = new TransactionStateMachine.SPI() this.transactionSpi = transactionStateMachineSPI;
{
@Override
public KernelTransaction beginTransaction( AuthSubject authSubject )
{
db.beginTransaction( KernelTransaction.Type.explicit, authSubject );
return txBridge.getKernelTransactionBoundToThisThread( false );
}

@Override
public void bindTransactionToCurrentThread( KernelTransaction tx )
{
txBridge.bindTransactionToCurrentThread( tx );
}

@Override
public void unbindTransactionFromCurrentThread()
{
txBridge.unbindTransactionFromCurrentThread();
}

@Override
public boolean isPeriodicCommit( String query )
{
return queryExecutionEngine.isPeriodicCommit( query );
}

@Override
public Result executeQuery( String querySource,
AuthSubject authSubject,
String statement,
Map<String, Object> params ) throws QueryExecutionKernelException
{
try
{
return statementRunner.run( querySource, authSubject, statement, params );
}
catch ( KernelException e )
{
throw new QueryExecutionKernelException( e );
}
}

};
} }


@Override @Override
Expand Down Expand Up @@ -146,6 +88,7 @@ public AuthenticationResult authenticate( Map<String,Object> authToken ) throws
{ {
return authentication.authenticate( authToken ); return authentication.authenticate( authToken );
} }

@Override @Override
public void udcRegisterClient( String clientName ) public void udcRegisterClient( String clientName )
{ {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/ */
package org.neo4j.bolt.v1.runtime; package org.neo4j.bolt.v1.runtime;


import java.util.function.Supplier;

import org.neo4j.bolt.security.auth.Authentication; import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.v1.runtime.cypher.CypherStatementRunner; import org.neo4j.bolt.v1.runtime.cypher.CypherStatementRunner;
import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;
Expand All @@ -27,6 +29,7 @@
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.QueryExecutionEngine; import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -44,6 +47,7 @@ public class LifecycleManagedBoltFactory extends LifecycleAdapter implements Bol


private QueryExecutionEngine queryExecutionEngine; private QueryExecutionEngine queryExecutionEngine;
private GraphDatabaseQueryService queryService; private GraphDatabaseQueryService queryService;
private TransactionIdStore transactionIdStore;


public LifecycleManagedBoltFactory( GraphDatabaseAPI gds, UsageData usageData, LogService logging, public LifecycleManagedBoltFactory( GraphDatabaseAPI gds, UsageData usageData, LogService logging,
ThreadToStatementContextBridge txBridge, Authentication authentication, ThreadToStatementContextBridge txBridge, Authentication authentication,
Expand All @@ -69,6 +73,7 @@ public void start() throws Throwable
DependencyResolver dependencyResolver = gds.getDependencyResolver(); DependencyResolver dependencyResolver = gds.getDependencyResolver();
queryExecutionEngine = dependencyResolver.resolveDependency( QueryExecutionEngine.class ); queryExecutionEngine = dependencyResolver.resolveDependency( QueryExecutionEngine.class );
queryService = dependencyResolver.resolveDependency( GraphDatabaseQueryService.class ); queryService = dependencyResolver.resolveDependency( GraphDatabaseQueryService.class );
transactionIdStore = dependencyResolver.resolveDependency( TransactionIdStore.class );
life.start(); life.start();
} }


Expand All @@ -89,8 +94,10 @@ public BoltStateMachine newMachine( String connectionDescriptor, Runnable onClos
{ {
final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, txBridge, final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, txBridge,
queryService ); queryService );
BoltStateMachine.SPI spi = new BoltStateMachineSPI( connectionDescriptor, usageData, gds, TransactionStateMachine.SPI transactionSPI = new TransactionStateMachineSPI( gds, txBridge,
queryExecutionEngine, logging, authentication, txBridge, statementRunner, connectionTracker ); queryExecutionEngine, statementRunner, transactionIdStore );
return new BoltStateMachine( spi, onClose ); BoltStateMachine.SPI boltSPI = new BoltStateMachineSPI( connectionDescriptor, usageData,
logging, authentication, connectionTracker, transactionSPI );
return new BoltStateMachine( boltSPI, onClose );
} }
} }
Loading

0 comments on commit 2f57d8f

Please sign in to comment.