Skip to content

Commit

Permalink
Query logging of user name and connection details for bolt
Browse files Browse the repository at this point in the history
  • Loading branch information
boggle committed Mar 11, 2016
1 parent 39b4dc5 commit a5ea319
Show file tree
Hide file tree
Showing 27 changed files with 223 additions and 92 deletions.
Expand Up @@ -30,6 +30,7 @@
import java.time.Clock; import java.time.Clock;
import java.util.List; import java.util.List;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function;


import org.neo4j.bolt.security.ssl.Certificates; import org.neo4j.bolt.security.ssl.Certificates;
import org.neo4j.bolt.security.ssl.KeyStoreFactory; import org.neo4j.bolt.security.ssl.KeyStoreFactory;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;


import static java.lang.String.format;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.neo4j.collection.primitive.Primitive.longObjectMap; import static org.neo4j.collection.primitive.Primitive.longObjectMap;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT;
Expand Down Expand Up @@ -98,6 +100,51 @@ public static class Settings
setting( "org.neo4j.server.webserver.address", STRING, setting( "org.neo4j.server.webserver.address", STRING,
"localhost", illegalValueMessage( "Must be a valid hostname", org.neo4j.kernel.configuration "localhost", illegalValueMessage( "Must be a valid hostname", org.neo4j.kernel.configuration
.Settings.matches( ANY ) ) ); .Settings.matches( ANY ) ) );


public static <T> Setting<T> connector( int i, Setting<T> setting )
{
String name = format( "dbms.connector.%s", i );
return new Setting<T>()
{
@Override
public String name()
{
return format( "%s.%s", name, setting.name() );
}

@Override
public String getDefaultValue()
{
return setting.getDefaultValue();
}

@Override
public T apply( Function<String,String> settings )
{
return setting.apply( settings );
}

@Override
public int hashCode()
{
return name().hashCode();
}

@Override
public boolean equals( Object obj )
{
return obj instanceof Setting<?> && ((Setting<?>) obj).name().equals( name() );
}
};
}
}

public enum EncryptionLevel
{
REQUIRED,
OPTIONAL,
DISABLED

This comment has been minimized.

Copy link
@benbc

benbc Mar 13, 2016

Contributor

@boggle This looks like a bad merge to me. These were moved out into another class and you've reinstated them. They don't appear to be used. Don't bother fixing it -- I've removed them on a commit I'll merge soon; but can you confirm that you think it's an error?

This comment has been minimized.

Copy link
@boggle

boggle Mar 13, 2016

Author Member

Most likely a merge/rebase gone wrong, thanks for handling this.

} }


public interface Dependencies public interface Dependencies
Expand Down Expand Up @@ -206,7 +253,10 @@ private PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> newVers
PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions = longObjectMap(); PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions = longObjectMap();
availableVersions.put( availableVersions.put(
BoltProtocolV1.VERSION, BoltProtocolV1.VERSION,
( channel, isEncrypted ) -> new BoltProtocolV1( logging, sessions.newSession( isEncrypted ), channel ) ( channel, isEncrypted ) -> {
String descriptor = format( "\tclient:%s\tserver:%s", channel.remoteAddress(), channel.localAddress() );
return new BoltProtocolV1( logging, sessions.newSession( descriptor, isEncrypted ), channel );
}
); );
return availableVersions; return availableVersions;
} }
Expand All @@ -231,15 +281,14 @@ private KeyStoreInformation createKeyStore( Configuration connector, Log log )
if ( !certificatePath.exists() ) if ( !certificatePath.exists() )
{ {
throw new IllegalStateException( throw new IllegalStateException(
String.format( format( "TLS private key found, but missing certificate at '%s'. Cannot start server without " +
"TLS private key found, but missing certificate at '%s'. Cannot start server without " +
"certificate.", "certificate.",
certificatePath ) ); certificatePath ) );
} }
if ( !privateKeyPath.exists() ) if ( !privateKeyPath.exists() )
{ {
throw new IllegalStateException( throw new IllegalStateException(
String.format( "TLS certificate found, but missing key at '%s'. Cannot start server without key.", format( "TLS certificate found, but missing key at '%s'. Cannot start server without key.",
privateKeyPath ) ); privateKeyPath ) );
} }


Expand Down
Expand Up @@ -48,13 +48,13 @@ public MonitoredSessions( Monitors monitors, Sessions delegate, Clock clock )
} }


@Override @Override
public Session newSession( boolean isEncrypted ) public Session newSession( String connectionDescriptor, boolean isEncrypted )
{ {
if( monitors.hasListeners( SessionMonitor.class ) ) if( monitors.hasListeners( SessionMonitor.class ) )
{ {
return new MonitoredSession( monitor, delegate.newSession( isEncrypted ), clock ); return new MonitoredSession( monitor, delegate.newSession( connectionDescriptor, isEncrypted ), clock );
} }
return delegate.newSession(isEncrypted); return delegate.newSession( connectionDescriptor, isEncrypted );
} }


static class MonitoredSession implements Session static class MonitoredSession implements Session
Expand All @@ -76,6 +76,11 @@ public String key()
return delegate.key(); return delegate.key();
} }


public String connectionDescriptor()
{
return delegate.connectionDescriptor();
}

@Override @Override
public <A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback ) public <A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback )
{ {
Expand Down
Expand Up @@ -122,6 +122,9 @@ public static <V, A> Callback<V,A> noop()
/** A session id that is unique for this database instance */ /** A session id that is unique for this database instance */
String key(); String key();


/** A descriptor for the underlying medium (connection etc) via which this session is being used */
String connectionDescriptor();

/** /**
* Initialize the session. * Initialize the session.
*/ */
Expand Down
Expand Up @@ -24,10 +24,15 @@
*/ */
public interface Sessions public interface Sessions
{ {
default Session newSession() default Session newSession( String connectionDescriptor )
{ {
return newSession( false ); return newSession( connectionDescriptor, false );
} }


Session newSession( boolean isEncrypted ); /**
* @param connectionDescriptor a string for logging that describes the underlying medium over which this session is used (TCP, HTTP, ...)
* @param isEncrypted if the session has to be encrypted
* @return a new session
*/
Session newSession( String connectionDescriptor, boolean isEncrypted );
} }
Expand Up @@ -33,13 +33,14 @@ public EncryptionRequiredSessions( Sessions sessions )
} }


@Override @Override
public Session newSession( boolean isEncrypted ) public Session newSession( String connectionDescriptor, boolean isEncrypted )
{ {
if ( !isEncrypted ) if ( !isEncrypted )
{ {
return new ErrorReportingSession( return new ErrorReportingSession(
connectionDescriptor,
new Neo4jError( Status.Security.EncryptionRequired, "This server requires a TLS encrypted connection." ) ); new Neo4jError( Status.Security.EncryptionRequired, "This server requires a TLS encrypted connection." ) );
} }
return delegate.newSession( isEncrypted ); return delegate.newSession( connectionDescriptor, isEncrypted );
} }
} }
Expand Up @@ -28,13 +28,15 @@


public class ErrorReportingSession implements Session public class ErrorReportingSession implements Session
{ {
private final String connectionDescriptor;
private final Neo4jError error; private final Neo4jError error;
private final String id; private final String id;


public ErrorReportingSession( Neo4jError error ) public ErrorReportingSession( String connectionDescriptor, Neo4jError error )
{ {
this.error = error; this.error = error;
this.id = UUID.randomUUID().toString(); this.id = UUID.randomUUID().toString();
this.connectionDescriptor = connectionDescriptor;
} }


@Override @Override
Expand All @@ -43,6 +45,12 @@ public String key()
return id; return id;
} }


@Override
public String connectionDescriptor()
{
return connectionDescriptor;
}

private <V, A> void reportError( A attachment, Callback<V,A> callback ) private <V, A> void reportError( A attachment, Callback<V,A> callback )
{ {
if ( callback != null ) if ( callback != null )
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.neo4j.kernel.impl.query.QuerySession; import org.neo4j.kernel.impl.query.QuerySession;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;


import static java.lang.String.format;
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit; import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;
import static org.neo4j.kernel.api.KernelTransaction.Type.implicit; import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;


Expand Down Expand Up @@ -75,6 +76,7 @@ public State init( SessionStateMachine ctx, String clientName, Map<String,Object
ctx.accessMode = authResult.getAccessMode(); ctx.accessMode = authResult.getAccessMode();
ctx.result( authResult.credentialsExpired() ); ctx.result( authResult.credentialsExpired() );
ctx.spi.udcRegisterClient( clientName ); ctx.spi.udcRegisterClient( clientName );
ctx.setQuerySourceFromClientNameAndPrincipal( clientName, authToken.get( Authentication.PRINCIPAL ) );
return IDLE; return IDLE;
} }
catch ( AuthenticationException e ) catch ( AuthenticationException e )
Expand All @@ -91,7 +93,7 @@ public State init( SessionStateMachine ctx, String clientName, Map<String,Object
protected State onNoImplementation( SessionStateMachine ctx, String command ) protected State onNoImplementation( SessionStateMachine ctx, String command )
{ {
ctx.error( new Neo4jError( Status.Request.Invalid, "No operations allowed until you send an " + ctx.error( new Neo4jError( Status.Request.Invalid, "No operations allowed until you send an " +
"INIT message." ) ); "INIT message." ) );
return halt( ctx ); return halt( ctx );
} }
}, },
Expand Down Expand Up @@ -467,7 +469,7 @@ State error( SessionStateMachine ctx, Neo4jError err )
// Is this error bad enough that we should roll back, or did the failure occur in an implicit // Is this error bad enough that we should roll back, or did the failure occur in an implicit
// transaction? // transaction?
if( err.status().code().classification().rollbackTransaction() || if( err.status().code().classification().rollbackTransaction() ||
ctx.currentTransaction.transactionType() == implicit ) ctx.currentTransaction.transactionType() == implicit )
{ {
try try
{ {
Expand All @@ -477,7 +479,7 @@ State error( SessionStateMachine ctx, Neo4jError err )
catch ( Throwable t ) catch ( Throwable t )
{ {
ctx.spi.reportError( "While handling '" + err.status() + "', a second failure occurred when " + ctx.spi.reportError( "While handling '" + err.status() + "', a second failure occurred when " +
"rolling back transaction: " + t.getMessage(), t ); "rolling back transaction: " + t.getMessage(), t );
} }
finally finally
{ {
Expand Down Expand Up @@ -529,6 +531,9 @@ public String[] fieldNames()
/** The current transaction, if present */ /** The current transaction, if present */
private KernelTransaction currentTransaction; private KernelTransaction currentTransaction;


/** The current query source, if initialized */
private String currentQuerySource;

/** Callback poised to receive the next response */ /** Callback poised to receive the next response */
private Callback currentCallback; private Callback currentCallback;


Expand Down Expand Up @@ -557,6 +562,7 @@ public String[] fieldNames()
*/ */
interface SPI interface SPI
{ {
String connectionDescriptor();
void reportError( Neo4jError err ); void reportError( Neo4jError err );
void reportError( String message, Throwable cause ); void reportError( String message, Throwable cause );
KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode ); KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode );
Expand All @@ -568,13 +574,11 @@ RecordStream run( SessionStateMachine ctx, String statement, Map<String, Object>
void udcRegisterClient( String clientName ); void udcRegisterClient( String clientName );
Statement currentStatement(); Statement currentStatement();
} }

public SessionStateMachine( String connectionDescriptor, UsageData usageData, GraphDatabaseFacade db, ThreadToStatementContextBridge txBridge,
public SessionStateMachine( UsageData usageData, GraphDatabaseFacade db, ThreadToStatementContextBridge txBridge,
StatementRunner engine, LogService logging, Authentication authentication ) StatementRunner engine, LogService logging, Authentication authentication )
{ {
this( new StandardStateMachineSPI( usageData, db, engine, logging, authentication, txBridge )); this( new StandardStateMachineSPI( connectionDescriptor, usageData, db, engine, logging, authentication, txBridge ));
} }

public SessionStateMachine( SPI spi ) public SessionStateMachine( SPI spi )
{ {
this.spi = spi; this.spi = spi;
Expand All @@ -587,6 +591,22 @@ public String key()
return id; return id;
} }


public String connectionDescriptor()
{
return spi.connectionDescriptor();
}

private String querySource()
{
return currentQuerySource;
}

private void setQuerySourceFromClientNameAndPrincipal( String clientName, Object principal )
{
String principalName = principal == null ? "null" : principal.toString();
currentQuerySource = format( "bolt\t%s\t%s\t%s>", principalName, clientName, connectionDescriptor() );
}

@Override @Override
public <A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback ) public <A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback )
{ {
Expand Down Expand Up @@ -712,15 +732,7 @@ public QuerySession createSession( GraphDatabaseQueryService service, PropertyCo
service.beginTransaction( currentTransaction.transactionType(), currentTransaction.mode() ); service.beginTransaction( currentTransaction.transactionType(), currentTransaction.mode() );
Neo4jTransactionalContext transactionalContext = Neo4jTransactionalContext transactionalContext =
new Neo4jTransactionalContext( service, transaction, spi.currentStatement(), locker ); new Neo4jTransactionalContext( service, transaction, spi.currentStatement(), locker );

return new BoltQuerySession( transactionalContext, querySource() );
return new QuerySession( transactionalContext )
{
@Override
public String toString()
{
return "bolt";
}
};
} }


public State state() public State state()
Expand Down Expand Up @@ -825,4 +837,21 @@ private void ignored()
currentCallback.ignored( currentAttachment ); currentCallback.ignored( currentAttachment );
} }
} }

private class BoltQuerySession extends QuerySession
{
private final String querySource;

public BoltQuerySession( Neo4jTransactionalContext transactionalContext, String querySource )
{
super( transactionalContext );
this.querySource = querySource;
}

@Override
public String toString()
{
return format( "bolt-session\t%s", querySource );
}
}
} }
Expand Up @@ -95,9 +95,9 @@ public void shutdown() throws Throwable
} }


@Override @Override
public Session newSession( boolean isEncrypted ) public Session newSession( String connectionDescriptor, boolean isEncrypted )
{ {
return new SessionStateMachine( usageData, gds, txBridge, statementRunner, logging, authentication ); return new SessionStateMachine( connectionDescriptor, usageData, gds, txBridge, statementRunner, logging, authentication );
} }


private Authentication authentication( DependencyResolver dependencyResolver ) private Authentication authentication( DependencyResolver dependencyResolver )
Expand Down
Expand Up @@ -40,6 +40,7 @@


class StandardStateMachineSPI implements SessionStateMachine.SPI class StandardStateMachineSPI implements SessionStateMachine.SPI
{ {
private final String connectionDescriptor;
private final UsageData usageData; private final UsageData usageData;
private final GraphDatabaseFacade db; private final GraphDatabaseFacade db;
private final StatementRunner statementRunner; private final StatementRunner statementRunner;
Expand All @@ -49,9 +50,10 @@ class StandardStateMachineSPI implements SessionStateMachine.SPI
private final ThreadToStatementContextBridge txBridge; private final ThreadToStatementContextBridge txBridge;
private final DecayingFlags featureUsage; private final DecayingFlags featureUsage;


StandardStateMachineSPI( UsageData usageData, GraphDatabaseFacade db, StatementRunner statementRunner, StandardStateMachineSPI( String connectionDescriptor, UsageData usageData, GraphDatabaseFacade db, StatementRunner statementRunner,
LogService logging, Authentication authentication, ThreadToStatementContextBridge txBridge ) LogService logging, Authentication authentication, ThreadToStatementContextBridge txBridge )
{ {
this.connectionDescriptor = connectionDescriptor;
this.usageData = usageData; this.usageData = usageData;
this.db = db; this.db = db;
this.statementRunner = statementRunner; this.statementRunner = statementRunner;
Expand All @@ -62,6 +64,12 @@ class StandardStateMachineSPI implements SessionStateMachine.SPI
this.authentication = authentication; this.authentication = authentication;
} }


@Override
public String connectionDescriptor()
{
return connectionDescriptor;
}

@Override @Override
public void reportError( Neo4jError err ) public void reportError( Neo4jError err )
{ {
Expand Down

0 comments on commit a5ea319

Please sign in to comment.