Skip to content

Commit

Permalink
Incorporated feedback from PR
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd authored and henriknyman committed Jul 13, 2016
1 parent 41b21e0 commit 86c7f91
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 77 deletions.
Expand Up @@ -145,15 +145,15 @@ public String username()
}

@Override
public void markForTermination( Status status, String message )
public void markForHalting( Status status, String message )
{
delegate.markForTermination( status, message );
delegate.markForHalting( status, message );
}

@Override
public boolean willBeTerminated()
public boolean willBeHalted()
{
return delegate.willBeTerminated();
return delegate.willBeHalted();
}

@Override
Expand Down
Expand Up @@ -23,8 +23,7 @@

import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.bolt.KillableUserSession;
import org.neo4j.kernel.api.security.AuthSubject;
import org.neo4j.kernel.api.bolt.HaltableUserSession;

/**
* A user session associated with a given {@link Sessions}. The majority of methods on this
Expand All @@ -39,7 +38,7 @@
* While the operations are asynchronous, they are guaranteed to be executed in calling order. This allows you to call
* several operations in sequence without waiting for the previous operation to complete.
*/
public interface Session extends AutoCloseable, KillableUserSession
public interface Session extends AutoCloseable, HaltableUserSession
{
/**
* Callback for handling the result of requests. For a given session, callbacks will be invoked serially,
Expand Down
Expand Up @@ -25,9 +25,9 @@
import org.neo4j.bolt.v1.runtime.Session;
import org.neo4j.bolt.v1.runtime.StatementMetadata;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.bolt.KillableUserSession;
import org.neo4j.kernel.api.bolt.HaltableUserSession;

public class ErrorReportingSession extends KillableUserSession.Adapter implements Session
public class ErrorReportingSession extends HaltableUserSession.Adapter implements Session
{
private final String connectionDescriptor;
private final Neo4jError error;
Expand Down
Expand Up @@ -425,9 +425,9 @@ public State halt( SessionStateMachine ctx )
@Override
protected State onNoImplementation( SessionStateMachine ctx, String command )
{
if ( ctx.willBeTerminated() )
if ( ctx.willBeHalted() )
{
ctx.error( ctx.terminationMark.explanation() );
ctx.error( ctx.haltMark.explanation() );
}
else
{
Expand Down Expand Up @@ -634,10 +634,10 @@ public String[] fieldNames()
private final AtomicInteger interruptCounter = new AtomicInteger();

/**
* This is set when {@link #markForTermination(Status, String)} is called.
* This is set when {@link #markForHalting(Status, String)} is called.
* When this is true, all messages will be ignored, and the session stopped.
*/
protected final TerminationMark terminationMark = new TerminationMark();
protected final TerminationMark haltMark = new TerminationMark();

/** The current session state */
private State state = State.UNINITIALIZED;
Expand Down Expand Up @@ -837,28 +837,28 @@ public void interrupt()
}

@Override
public void markForTermination( Status status, String message )
public void markForHalting( Status status, String message )
{
// NOTE: This is a side-channel method call. You *cannot*
// mutate any of the regular state in the state machine
// from inside this method, it WILL lead to race conditions.
// Imagine this is always called from a separate thread, while
// the main session worker thread is actively working on mutating
// fields on the session.
terminationMark.setMark( new Neo4jError( status, message ) );
haltMark.setMark( new Neo4jError( status, message ) );

// If there is currently a transaction running, terminate it
KernelTransaction tx = this.currentTransaction;
if(tx != null)
{
tx.markForTermination();
tx.markForTermination( status );
}
}

@Override
public boolean willBeTerminated()
public boolean willBeHalted()
{
return terminationMark.get();
return haltMark.isMarked();
}

@Override
Expand Down Expand Up @@ -942,7 +942,7 @@ private void before( Object attachment, Callback cb )
cb.started( attachment );
}

if ( terminationMark.get() )
if ( haltMark.isMarked() )
{
state = state.halt( this );
}
Expand Down
Expand Up @@ -25,7 +25,7 @@
import java.util.stream.Collectors;

import org.neo4j.helpers.Service;
import org.neo4j.kernel.api.bolt.KillableUserSession;
import org.neo4j.kernel.api.bolt.HaltableUserSession;
import org.neo4j.kernel.api.bolt.SessionManager;

@Service.Implementation( SessionManager.class )
Expand All @@ -36,22 +36,22 @@ public StandardSessionManager()
super( "standard-session-tracker" );
}

private Set<KillableUserSession> sessions = new ConcurrentSet<>();
private Set<HaltableUserSession> sessions = new ConcurrentSet<>();

@Override
public void sessionActivated( KillableUserSession session )
public void sessionActivated( HaltableUserSession session )
{
sessions.add( session );
}

@Override
public void sessionHalted( KillableUserSession session )
public void sessionHalted( HaltableUserSession session )
{
sessions.remove( session );
}

@Override
public Set<KillableUserSession> getActiveSessions()
public Set<HaltableUserSession> getActiveSessions()
{
return sessions.stream().collect( Collectors.toSet() );
}
Expand Down
Expand Up @@ -19,24 +19,24 @@
*/
package org.neo4j.bolt.v1.runtime.internal;

import java.util.concurrent.atomic.AtomicReference;

public class TerminationMark
{
private boolean mark = false;
private Neo4jError explanation;
private AtomicReference<Neo4jError> explanation = new AtomicReference<>();

public synchronized void setMark( Neo4jError explanation )
public void setMark( Neo4jError explanation )
{
mark = true;
this.explanation = explanation;
this.explanation.set( explanation );
}

public synchronized boolean get()
public boolean isMarked()
{
return mark;
return explanation.get() != null;
}

public synchronized Neo4jError explanation()
public Neo4jError explanation()
{
return explanation;
return explanation.get();
}
}
Expand Up @@ -25,15 +25,15 @@
import java.util.function.Consumer;

import org.neo4j.bolt.v1.runtime.Session;
import org.neo4j.kernel.api.bolt.KillableUserSession;
import org.neo4j.kernel.api.bolt.HaltableUserSession;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;

/**
* Executes incoming session commands on a specified session.
*/
public class SessionWorker implements Runnable, KillableUserSession
public class SessionWorker implements Runnable, HaltableUserSession
{
/** Poison pill for closing the session and shutting down the worker */
public static final Consumer<Session> SHUTDOWN = session1 -> {};
Expand Down Expand Up @@ -134,14 +134,14 @@ public String username()
return session.username();
}

public void markForTermination( Status status, String message )
public void markForHalting( Status status, String message )
{
session.markForTermination( status, message );
session.markForHalting( status, message );
}

@Override
public boolean willBeTerminated()
public boolean willBeHalted()
{
return session.willBeTerminated();
return session.willBeHalted();
}
}
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.AuthSubject;

/**
* A session implementation that delegates work to a worker thread.
Expand Down Expand Up @@ -134,14 +133,14 @@ public String username()
}

@Override
public void markForTermination( Status status, String message )
public void markForHalting( Status status, String message )
{
worker.markForTermination( status, message );
worker.markForHalting( status, message );
}

@Override
public boolean willBeTerminated()
public boolean willBeHalted()
{
return worker.willBeTerminated();
return worker.willBeHalted();
}
}
@@ -1 +1 @@
org.neo4j.bolt.v1.runtime.internal.StandardSessionManager
org.neo4j.bolt.v1.runtime.internal.StandardSessionManager
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.bolt.v1.runtime.MonitoredSessions.MonitoredSession;
import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.api.bolt.KillableUserSession;
import org.neo4j.kernel.api.bolt.HaltableUserSession;
import org.neo4j.kernel.monitoring.Monitors;

import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void processingDone( long processingTime )
}
}

private static class ControlledCompletionSession extends KillableUserSession.Adapter implements Session
private static class ControlledCompletionSession extends HaltableUserSession.Adapter implements Session
{
public Callback callback;

Expand Down
Expand Up @@ -419,7 +419,7 @@ public Code code()

enum Session implements Status
{
SessionTerminated( ClientError, "The session has been terminated by the server." );
InvalidSession( ClientError, "The session is no longer available, possibly due to termination." );

private final Code code;

Expand Down
Expand Up @@ -21,30 +21,30 @@

import org.neo4j.kernel.api.exceptions.Status;

public interface KillableUserSession
public interface HaltableUserSession
{
String username();

void markForTermination( Status status, String message );
void markForHalting( Status status, String message );

boolean willBeTerminated();
boolean willBeHalted();

class Adapter implements KillableUserSession
class Adapter implements HaltableUserSession
{
@Override
public String username()
{
return "KillableUserSession.Adapter";
return "HaltableUserSession.Adapter";
}

@Override
public void markForTermination( Status status, String message )
public void markForHalting( Status status, String message )
{

}

@Override
public boolean willBeTerminated()
public boolean willBeHalted()
{
return false;
}
Expand Down
Expand Up @@ -23,9 +23,9 @@

public interface SessionManager
{
void sessionActivated( KillableUserSession session );
void sessionActivated( HaltableUserSession session );

void sessionHalted( KillableUserSession session );
void sessionHalted( HaltableUserSession session );

Set<KillableUserSession> getActiveSessions();
Set<HaltableUserSession> getActiveSessions();
}
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.graphdb.security.AuthorizationViolationException;
import org.neo4j.kernel.api.KernelTransaction;

import org.neo4j.kernel.api.bolt.KillableUserSession;
import org.neo4j.kernel.api.bolt.HaltableUserSession;
import org.neo4j.kernel.api.bolt.SessionManager;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.AuthSubject;
Expand Down Expand Up @@ -229,8 +229,8 @@ public Stream<SessionResult> listSessions()
SessionManager sessionManager = getSessionManager();
return countSessionByUsername(
sessionManager.getActiveSessions().stream()
.filter( session -> !session.willBeTerminated() )
.map( KillableUserSession::username )
.filter( session -> !session.willBeHalted() )
.map( HaltableUserSession::username )
);
}

Expand All @@ -247,12 +247,12 @@ public Stream<SessionResult> terminateSessionsForUser( @Name( "username" ) Strin
subject.getUserManager().getUser( username );

Long killCount = 0L;
for ( KillableUserSession session : getSessionManager().getActiveSessions() )
for ( HaltableUserSession session : getSessionManager().getActiveSessions() )
{
if ( session.username().equals( username ) )
{
session.markForTermination( Status.Session.SessionTerminated,
Status.Session.SessionTerminated.code().description() );
session.markForHalting( Status.Session.InvalidSession,
Status.Session.InvalidSession.code().description() );
killCount += 1;
}
}
Expand Down

0 comments on commit 86c7f91

Please sign in to comment.