Skip to content

Commit

Permalink
Added logic to check that the database is up to date wrt to a given txId
Browse files Browse the repository at this point in the history
What it is missing in this commit is that we need to track ids during
writes in order to maintaince the version up to date.
  • Loading branch information
davidegrohmann committed Jul 14, 2016
1 parent bc2dc3b commit fb7a60b
Show file tree
Hide file tree
Showing 22 changed files with 545 additions and 227 deletions.
116 changes: 67 additions & 49 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.function.BiFunction;

import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.security.auth.BasicAuthentication;
import org.neo4j.bolt.security.ssl.Certificates;
import org.neo4j.bolt.security.ssl.KeyStoreFactory;
import org.neo4j.bolt.security.ssl.KeyStoreInformation;
Expand All @@ -57,15 +59,17 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings.BoltConnector;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.Service;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.api.bolt.SessionTracker;
import org.neo4j.kernel.api.security.AuthManager;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Internal;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -96,15 +100,15 @@ public static class Settings

@Internal
@Description( "Path to the X.509 public certificate to be used by Neo4j for TLS connections" )
public static Setting<File> tls_certificate_file = derivedSetting(
"unsupported.dbms.security.tls_certificate_file", certificates_directory,
( certificates ) -> new File( certificates, "neo4j.cert" ), PATH );
public static Setting<File> tls_certificate_file =
derivedSetting( "unsupported.dbms.security.tls_certificate_file", certificates_directory,
( certificates ) -> new File( certificates, "neo4j.cert" ), PATH );

@Internal
@Description( "Path to the X.509 private key to be used by Neo4j for TLS connections" )
public static final Setting<File> tls_key_file = derivedSetting(
"unsupported.dbms.security.tls_key_file", certificates_directory,
(certificates ) -> new File( certificates, "neo4j.key" ), PATH );
public static final Setting<File> tls_key_file =
derivedSetting( "unsupported.dbms.security.tls_key_file", certificates_directory,
( certificates ) -> new File( certificates, "neo4j.key" ), PATH );
}

public interface Dependencies
Expand All @@ -124,6 +128,10 @@ public interface Dependencies
ThreadToStatementContextBridge txBridge();

SessionTracker sessionTracker();

NeoStoreDataSource dataSource();

AuthManager authManager();
}

public BoltKernelExtension()
Expand All @@ -136,22 +144,26 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
{
final Config config = dependencies.config();
final GraphDatabaseService gdb = dependencies.db();
final GraphDatabaseFacade api = (GraphDatabaseFacade) gdb;
final LogService logging = dependencies.logService();
final Log log = logging.getInternalLog( Sessions.class );
final GraphDatabaseAPI api = (GraphDatabaseAPI) gdb;
final LogService logService = dependencies.logService();
final Log log = logService.getInternalLog( Sessions.class );

final LifeSupport life = new LifeSupport();

final JobScheduler scheduler = dependencies.scheduler();

Netty4LogBridge.setLogProvider( logging.getInternalLogProvider() );
Netty4LogBridge.setLogProvider( logService.getInternalLogProvider() );

Authentication authentication = authentication( dependencies.config(), dependencies.authManager(), logService );

StandardSessions standardSessions =
new StandardSessions( api, dependencies.usageData(), logService, dependencies.txBridge(),
authentication, dependencies.dataSource(), dependencies.sessionTracker() );
Sessions sessions =
new MonitoredSessions( dependencies.monitors(),
new ThreadedSessions(
life.add( new StandardSessions( api, dependencies.usageData(), logging,
dependencies.txBridge(), dependencies.sessionTracker() ) ),
scheduler, logging ), Clock.systemUTC() );
life.add( standardSessions ),
scheduler, logService ), Clock.systemUTC() );

List<ProtocolInitializer> connectors = config
.view( enumerate( GraphDatabaseSettings.Connector.class ) )
Expand All @@ -164,33 +176,32 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
boolean requireEncryption = false;
switch ( config.get( connConfig.encryption_level ) )
{
// self signed cert should be generated when encryption is REQUIRED or OPTIONAL on the server
// while no cert is generated if encryption is DISABLED
case REQUIRED:
requireEncryption = true;
// no break here
case OPTIONAL:
sslCtx = createSslContext( config, log, address );
break;
default:
// case DISABLED:
sslCtx = null;
break;
// self signed cert should be generated when encryption is REQUIRED or OPTIONAL on the server
// while no cert is generated if encryption is DISABLED
case REQUIRED:
requireEncryption = true;
// no break here
case OPTIONAL:
sslCtx = createSslContext( config, log, address );
break;
default:
// case DISABLED:
sslCtx = null;
break;
}

return new SocketTransport( address, sslCtx, logging.getInternalLogProvider(),
newVersions( logging, requireEncryption ?
new EncryptionRequiredSessions( sessions ) : sessions ) );
} )
.collect( toList() );
return new SocketTransport( address, sslCtx, logService.getInternalLogProvider(),
newVersions( logService,
requireEncryption ? new EncryptionRequiredSessions( sessions ) : sessions ) );
} ).collect( toList() );

if ( connectors.size() > 0 && !config.get( GraphDatabaseSettings.disconnected ) )
{
life.add( new NettyServer( scheduler.threadFactory( boltNetworkIO ), connectors ) );
log.info( "Bolt Server extension loaded." );
for ( ProtocolInitializer connector : connectors )
{
logging.getUserLog( Sessions.class ).info( "Bolt enabled on %s.", connector.address() );
logService.getUserLog( Sessions.class ).info( "Bolt enabled on %s.", connector.address() );
}
}

Expand All @@ -202,30 +213,25 @@ private SslContext createSslContext( Config config, Log log, HostnamePort addres
try
{
KeyStoreInformation keyStore = createKeyStore( config, log, address );
return SslContextBuilder
.forServer( keyStore.getCertificatePath(), keyStore.getPrivateKeyPath() )
.build();
return SslContextBuilder.forServer( keyStore.getCertificatePath(), keyStore.getPrivateKeyPath() ).build();
}
catch(IOException | OperatorCreationException | GeneralSecurityException e )
catch ( IOException | OperatorCreationException | GeneralSecurityException e )
{
throw new RuntimeException( "Failed to initilize SSL encryption support, which is required to start this " +
"connector. Error was: " + e.getMessage(), e );
throw new RuntimeException( "Failed to initialize SSL encryption support, which is required to start " +
"this connector. Error was: " + e.getMessage(), e );
}
}

private PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> newVersions( LogService logging,
Sessions sessions )
{
PrimitiveLongObjectMap<BiFunction<Channel,Boolean,BoltProtocol>> availableVersions = longObjectMap();
availableVersions.put(
BoltProtocolV1.VERSION,
( channel, isEncrypted ) -> {
String descriptor = format( "\tclient%s\tserver%s", channel.remoteAddress(), channel.localAddress() );
ChunkedOutput output = new ChunkedOutput( channel, 8192 );
return new BoltProtocolV1( logging, sessions.newSession( descriptor, isEncrypted ),
new PackStreamMessageFormatV1.Writer( new Neo4jPack.Packer( output ), output ) );
}
);
availableVersions.put( BoltProtocolV1.VERSION, ( channel, isEncrypted ) -> {
String descriptor = format( "\tclient%s\tserver%s", channel.remoteAddress(), channel.localAddress() );
ChunkedOutput output = new ChunkedOutput( channel, 8192 );
return new BoltProtocolV1( logging, sessions.newSession( descriptor, isEncrypted ),
new PackStreamMessageFormatV1.Writer( new Neo4jPack.Packer( output ), output ) );
} );
return availableVersions;
}

Expand All @@ -246,8 +252,7 @@ private KeyStoreInformation createKeyStore( Configuration config, Log log, Hostn
{
throw new IllegalStateException(
format( "TLS private key found, but missing certificate at '%s'. Cannot start server without " +
"certificate.",
certificatePath ) );
"certificate.", certificatePath ) );
}
if ( !privateKeyPath.exists() )
{
Expand All @@ -258,4 +263,17 @@ private KeyStoreInformation createKeyStore( Configuration config, Log log, Hostn

return new KeyStoreFactory().createKeyStore( privateKeyPath, certificatePath );
}

private Authentication authentication( Config config, AuthManager authManager, LogService logService )
{

if ( config.get( GraphDatabaseSettings.auth_enabled ) )
{
return new BasicAuthentication( authManager, logService.getInternalLogProvider() );
}
else
{
return Authentication.NONE;
}
}
}
Expand Up @@ -45,8 +45,6 @@ public class BasicAuthentication implements Authentication
private static final String SCHEME = "basic";
private final Log log;

private AuthSubject authSubject;

public BasicAuthentication( AuthManager authManager, LogProvider logProvider )
{
this.authManager = authManager;
Expand Down Expand Up @@ -111,7 +109,6 @@ private AuthenticationResult update( Map<String,Object> authToken ) throws Authe
{
case SUCCESS:
case PASSWORD_CHANGE_REQUIRED:
String username = AuthToken.safeCast( PRINCIPAL, authToken );
String newPassword = AuthToken.safeCast( NEW_CREDENTIALS, authToken );
authSubject.setPassword( newPassword );
break;
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.bolt.v1.runtime.StatementMetadata;
import org.neo4j.bolt.v1.runtime.internal.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.logging.Log;

/** Bridges the gap between incoming deserialized messages, the user environment and back. */
Expand Down Expand Up @@ -58,8 +59,9 @@ public TransportBridge( Log log, Session session, MessageHandler<IOException> ou
@Override
public void handleInitMessage( String clientName, Map<String,Object> authToken ) throws RuntimeException
{
session.init( clientName, authToken, null, initCallback );

// TODO: make the client transmit the version for now it is hardcoded to BASE_TX_ID to ensure current behaviour
long baseDBVersion = TransactionIdStore.BASE_TX_ID;
session.init( clientName, authToken, baseDBVersion, null, initCallback );
}

@Override
Expand Down
Expand Up @@ -83,10 +83,11 @@ public String connectionDescriptor()
}

@Override
public <A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback )
public <A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion,
A attachment, Callback<Boolean,A> callback )
{
monitor.messageReceived();
delegate.init( clientName, authToken, attachment, withMonitor( callback ) );
delegate.init( clientName, authToken, baseDBVersion, attachment, withMonitor( callback ) );
}

@Override
Expand Down
Expand Up @@ -54,7 +54,9 @@ interface Callback<V, A>
{
};

static <V,A> Callback<V,A> noOp() {
static <V,A> Callback<V,A> noOp()
{
//noinspection unchecked
return NO_OP;
}

Expand Down Expand Up @@ -129,7 +131,7 @@ public static <V, A> Callback<V,A> noop()
/**
* Initialize the session.
*/
<A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback );
<A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion, A attachment, Callback<Boolean,A> callback );

/**
* Run a statement, yielding a result stream which can be retrieved through pulling it in a subsequent call.
Expand Down
Expand Up @@ -62,7 +62,8 @@ private <V, A> void reportError( A attachment, Callback<V,A> callback )
}

@Override
public <A> void init( String clientName, Map<String,Object> authToken, A attachment, Callback<Boolean,A> callback )
public <A> void init( String clientName, Map<String,Object> authToken, long baseDBVersion,
A attachment, Callback<Boolean,A> callback )
{
reportError( attachment, callback );
}
Expand Down

0 comments on commit fb7a60b

Please sign in to comment.