Skip to content

Commit

Permalink
Merge pull request #7532 from fickludd/3.1-manage-bolt-sessions
Browse files Browse the repository at this point in the history
Procedures for managing bolt sessions
  • Loading branch information
henriknyman committed Jul 14, 2016
2 parents 7646af4 + 0a34e88 commit 7ee8425
Show file tree
Hide file tree
Showing 28 changed files with 965 additions and 21 deletions.
Expand Up @@ -57,6 +57,7 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings.BoltConnector;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.api.bolt.SessionTracker;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Internal;
import org.neo4j.kernel.extension.KernelExtensionFactory;
Expand Down Expand Up @@ -121,6 +122,8 @@ public interface Dependencies
Monitors monitors();

ThreadToStatementContextBridge txBridge();

SessionTracker sessionManager();
}

public BoltKernelExtension()
Expand All @@ -147,7 +150,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
new MonitoredSessions( dependencies.monitors(),
new ThreadedSessions(
life.add( new StandardSessions( api, dependencies.usageData(), logging,
dependencies.txBridge() ) ),
dependencies.txBridge(), dependencies.sessionManager() ) ),
scheduler, logging ), Clock.systemUTC() );

List<ProtocolInitializer> connectors = config
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.monitoring.Monitors;

/**
Expand Down Expand Up @@ -137,6 +138,24 @@ public void interrupt()
delegate.interrupt();
}

@Override
public String username()
{
return delegate.username();
}

@Override
public void markForHalting( Status status, String message )
{
delegate.markForHalting( status, message );
}

@Override
public boolean willBeHalted()
{
return delegate.willBeHalted();
}

@Override
public void close()
{
Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.bolt.HaltableUserSession;

/**
* A user session associated with a given {@link Sessions}. The majority of methods on this
Expand All @@ -37,7 +38,7 @@
* While the operations are asynchronous, they are guaranteed to be executed in calling order. This allows you to call
* several operations in sequence without waiting for the previous operation to complete.
*/
public interface Session extends AutoCloseable
public interface Session extends AutoCloseable, HaltableUserSession
{
/**
* Callback for handling the result of requests. For a given session, callbacks will be invoked serially,
Expand Down
Expand Up @@ -25,8 +25,9 @@
import org.neo4j.bolt.v1.runtime.Session;
import org.neo4j.bolt.v1.runtime.StatementMetadata;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.bolt.HaltableUserSession;

public class ErrorReportingSession implements Session
public class ErrorReportingSession extends HaltableUserSession.Adapter implements Session
{
private final String connectionDescriptor;
private final Neo4jError error;
Expand Down
Expand Up @@ -143,6 +143,8 @@ public static Status codeFromString( String codeStr )
return Status.Network.valueOf( error );
case "Security":
return Status.Security.valueOf( error );
case "Session":
return Status.Session.valueOf( error );
default:
return Status.General.UnknownError;
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.bolt.SessionTracker;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.AccessMode;
Expand Down Expand Up @@ -82,6 +83,7 @@ public State init( SessionStateMachine ctx, String clientName, Map<String,Object
ctx.result( authResult.credentialsExpired() );
ctx.spi.udcRegisterClient( clientName );
ctx.setQuerySourceFromClientNameAndPrincipal( clientName, authToken.get( AuthToken.PRINCIPAL ) );
ctx.spi.sessionActivated( ctx );
return IDLE;
}
catch ( AuthenticationException e )
Expand Down Expand Up @@ -423,7 +425,14 @@ public State halt( SessionStateMachine ctx )
@Override
protected State onNoImplementation( SessionStateMachine ctx, String command )
{
ctx.ignored();
if ( ctx.willBeHalted() )
{
ctx.error( ctx.haltMark.explanation() );
}
else
{
ctx.ignored();
}
return STOPPED;
}
};
Expand Down Expand Up @@ -540,6 +549,7 @@ public State halt( SessionStateMachine ctx )
ctx.error( Neo4jError.from( e ) );
}
}
ctx.spi.sessionHalted( ctx );
return STOPPED;
}

Expand Down Expand Up @@ -623,6 +633,12 @@ public String[] fieldNames()
*/
private final AtomicInteger interruptCounter = new AtomicInteger();

/**
* This is set when {@link #markForHalting(Status, String)} is called.
* When this is true, all messages will be ignored, and the session stopped.
*/
protected final TerminationMark haltMark = new TerminationMark();

/** The current session state */
private State state = State.UNINITIALIZED;

Expand Down Expand Up @@ -685,11 +701,14 @@ RecordStream run( SessionStateMachine ctx, String statement, Map<String, Object>
AuthenticationResult authenticate( Map<String, Object> authToken ) throws AuthenticationException;
void udcRegisterClient( String clientName );
Statement currentStatement();
void sessionActivated( Session session );
void sessionHalted( Session session );
}
public SessionStateMachine( String connectionDescriptor, UsageData usageData, GraphDatabaseFacade db, ThreadToStatementContextBridge txBridge,
StatementRunner engine, LogService logging, Authentication authentication )
StatementRunner engine, LogService logging, Authentication authentication, SessionTracker sessionTracker )
{
this( new StandardStateMachineSPI( connectionDescriptor, usageData, db, engine, logging, authentication, txBridge ));
this( new StandardStateMachineSPI( connectionDescriptor, usageData, db, engine, logging, authentication,
txBridge, sessionTracker ));
}
public SessionStateMachine( SPI spi )
{
Expand Down Expand Up @@ -817,6 +836,37 @@ public void interrupt()
}
}

@Override
public void markForHalting( Status status, String message )
{
// NOTE: This is a side-channel method call. You *cannot*
// mutate any of the regular state in the state machine
// from inside this method, it WILL lead to race conditions.
// Imagine this is always called from a separate thread, while
// the main session worker thread is actively working on mutating
// fields on the session.
haltMark.setMark( new Neo4jError( status, message ) );

// If there is currently a transaction running, terminate it
KernelTransaction tx = this.currentTransaction;
if(tx != null)
{
tx.markForTermination( status );
}
}

@Override
public boolean willBeHalted()
{
return haltMark.isMarked();
}

@Override
public String username()
{
return authSubject.name();
}

@Override
public void close()
{
Expand Down Expand Up @@ -892,7 +942,11 @@ private void before( Object attachment, Callback cb )
cb.started( attachment );
}

if( interruptCounter.get() > 0 )
if ( haltMark.isMarked() )
{
state = state.halt( this );
}
else if ( interruptCounter.get() > 0 )
{
// Force into interrupted state. This is how we 'discover'
// that `interrupt` has been called.
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.bolt.v1.runtime.Sessions;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.api.bolt.SessionTracker;
import org.neo4j.kernel.api.security.AuthManager;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
Expand All @@ -46,12 +47,13 @@ public class StandardSessions extends LifecycleAdapter implements Sessions
private final UsageData usageData;
private final LogService logging;
private final Authentication authentication;
private final SessionTracker sessionTracker;

private CypherStatementRunner statementRunner;
private ThreadToStatementContextBridge txBridge;

public StandardSessions( GraphDatabaseFacade gds, UsageData usageData, LogService logging,
ThreadToStatementContextBridge txBridge)
ThreadToStatementContextBridge txBridge, SessionTracker sessionTracker )
{
this.gds = gds;
this.usageData = usageData;
Expand All @@ -60,6 +62,7 @@ public StandardSessions( GraphDatabaseFacade gds, UsageData usageData, LogServic
DependencyResolver dependencyResolver = gds.getDependencyResolver();
this.txBridge = dependencyResolver.resolveDependency( ThreadToStatementContextBridge.class );
this.authentication = authentication( dependencyResolver );
this.sessionTracker = sessionTracker;
}

@Override
Expand Down Expand Up @@ -92,7 +95,8 @@ public void shutdown() throws Throwable
@Override
public Session newSession( String connectionDescriptor, boolean isEncrypted )
{
return new SessionStateMachine( connectionDescriptor, usageData, gds, txBridge, statementRunner, logging, authentication );
return new SessionStateMachine( connectionDescriptor, usageData, gds, txBridge, statementRunner, logging,
authentication, sessionTracker );
}

private Authentication authentication( DependencyResolver dependencyResolver )
Expand Down
Expand Up @@ -24,9 +24,11 @@
import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.security.auth.AuthenticationException;
import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.v1.runtime.Session;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.concurrent.DecayingFlags;
import org.neo4j.kernel.api.bolt.SessionTracker;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
Expand All @@ -49,9 +51,11 @@ class StandardStateMachineSPI implements SessionStateMachine.SPI
private final Authentication authentication;
private final ThreadToStatementContextBridge txBridge;
private final DecayingFlags featureUsage;
private final SessionTracker sessionTracker;

StandardStateMachineSPI( String connectionDescriptor, UsageData usageData, GraphDatabaseFacade db, StatementRunner statementRunner,
LogService logging, Authentication authentication, ThreadToStatementContextBridge txBridge )
LogService logging, Authentication authentication, ThreadToStatementContextBridge txBridge,
SessionTracker sessionTracker )
{
this.connectionDescriptor = connectionDescriptor;
this.usageData = usageData;
Expand All @@ -62,6 +66,7 @@ class StandardStateMachineSPI implements SessionStateMachine.SPI
this.errorReporter = new ErrorReporter( logging );
this.log = logging.getInternalLog( SessionStateMachine.class );
this.authentication = authentication;
this.sessionTracker = sessionTracker;
}

@Override
Expand Down Expand Up @@ -126,4 +131,16 @@ public Statement currentStatement()
{
return txBridge.get();
}

@Override
public void sessionActivated( Session session )
{
sessionTracker.sessionActivated( session );
}

@Override
public void sessionHalted( Session session )
{
sessionTracker.sessionHalted( session );
}
}
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime.internal;

import java.util.concurrent.atomic.AtomicReference;

public class TerminationMark
{
private AtomicReference<Neo4jError> explanation = new AtomicReference<>();

public void setMark( Neo4jError explanation )
{
this.explanation.set( explanation );
}

public boolean isMarked()
{
return explanation.get() != null;
}

public Neo4jError explanation()
{
return explanation.get();
}
}
Expand Up @@ -25,13 +25,15 @@
import java.util.function.Consumer;

import org.neo4j.bolt.v1.runtime.Session;
import org.neo4j.kernel.api.bolt.HaltableUserSession;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

/**
* Executes incoming session commands on a specified session.
*/
public class SessionWorker implements Runnable
public class SessionWorker implements Runnable, HaltableUserSession
{
/** Poison pill for closing the session and shutting down the worker */
public static final Consumer<Session> SHUTDOWN = session1 -> {};
Expand Down Expand Up @@ -126,4 +128,20 @@ public void interrupt()
{
session.interrupt();
}

public String username()
{
return session.username();
}

public void markForHalting( Status status, String message )
{
session.markForHalting( status, message );
}

@Override
public boolean willBeHalted()
{
return session.willBeHalted();
}
}

0 comments on commit 7ee8425

Please sign in to comment.