Skip to content

Commit

Permalink
Introduces a thin abstraction between GraphDatabaseFacade and Kernel
Browse files Browse the repository at this point in the history
components. The purpose is to allow alternate GraphDatabaseService instances
without having to re-implement the whole API.

With this change, the Procedures work can provide users a GDS instance that
can easily be tweaked to only allow reads and to disable #shutdown(), for
instance.

This seemed like a useful way to both significantly simply the Procedures
implementation, while also improving compartmentalization. It's by no means
done or perfect, but a nice step towards separating the Core API and Kernel.

Before:

    +----------+    +--------+
    | Graph    |    | Kernel |
    | Database +--->+ Stuff  |
    | Facade   |    |        |
    +----------+    +--------+

After:

    +----------+    +--------+    +--------+
    | Graph    |    | Spiffy |    | Kernel |
    | Database +--->+ SPI    +--->+ Stuff  |
    | Facade   |    |        |    |        |
    +----------+    +--------+    +--------+

Also includes:

o Fix Cypher compilation and ExecutionResultTest glitches
o Fix LazyTest
o Fix compilation errors by using KernelTransaction instead of Transaction or TopLevelTransaction in a couple of places
o Fix SessionStateMachineTest
  • Loading branch information
jakewins committed Jan 20, 2016
1 parent 7fdd6af commit 7847506
Show file tree
Hide file tree
Showing 62 changed files with 874 additions and 688 deletions.
Expand Up @@ -19,6 +19,11 @@
*/
package org.neo4j.bolt;

import io.netty.channel.Channel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.bouncycastle.operator.OperatorCreationException;

import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;
Expand All @@ -27,11 +32,6 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import io.netty.channel.Channel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.bouncycastle.operator.OperatorCreationException;

import org.neo4j.bolt.security.ssl.Certificates;
import org.neo4j.bolt.security.ssl.KeyStoreFactory;
import org.neo4j.bolt.security.ssl.KeyStoreInformation;
Expand Down Expand Up @@ -90,8 +90,7 @@ public static class Settings

@Description( "Set the encryption level for Neo4j Bolt protocol ports" )
public static final Setting<EncryptionLevel> tls_level =
setting( "tls.level", options( EncryptionLevel.class ),
OPTIONAL.name() );
setting( "tls.level", options( EncryptionLevel.class ), OPTIONAL.name() );

@Description( "Host and port for the Neo4j Bolt Protocol" )
public static final Setting<HostnamePort> socket_address =
Expand Down
Expand Up @@ -26,7 +26,8 @@
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.TopLevelTransaction;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.coreapi.TopLevelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService;
Expand Down Expand Up @@ -80,7 +81,8 @@ public State beginTransaction( SessionStateMachine ctx )
{
assert ctx.currentTransaction == null;
ctx.implicitTransaction = false;
ctx.currentTransaction = ctx.db.beginTx();
ctx.db.beginTx();
ctx.currentTransaction = ctx.txBridge.getKernelTransactionBoundToThisThread( false );
return IN_TRANSACTION;
}

Expand All @@ -104,7 +106,8 @@ public State beginImplicitTransaction( SessionStateMachine ctx )
{
assert ctx.currentTransaction == null;
ctx.implicitTransaction = true;
ctx.currentTransaction = ctx.db.beginTx();
ctx.db.beginTx();
ctx.currentTransaction = ctx.txBridge.getKernelTransactionBoundToThisThread( false );
return IN_TRANSACTION;
}

Expand Down Expand Up @@ -153,7 +156,7 @@ public State rollbackTransaction( SessionStateMachine ctx )
{
try
{
Transaction tx = ctx.currentTransaction;
KernelTransaction tx = ctx.currentTransaction;
ctx.currentTransaction = null;

tx.failure();
Expand Down Expand Up @@ -427,7 +430,7 @@ public String[] fieldNames()
private RecordStream currentResult;

/** The current transaction, if present */
private Transaction currentTransaction;
private KernelTransaction currentTransaction;

/** Callback poised to receive the next response */
private Callback currentCallback;
Expand Down Expand Up @@ -584,7 +587,7 @@ private void before( Object attachment, Callback cb )
{
if ( hasTransaction() )
{
txBridge.bindTransactionToCurrentThread( (TopLevelTransaction) currentTransaction );
txBridge.bindTransactionToCurrentThread( currentTransaction );
}
assert this.currentCallback == null;
assert this.currentAttachment == null;
Expand Down
Expand Up @@ -31,7 +31,8 @@
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.TopLevelTransaction;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.coreapi.TopLevelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.NullLogService;
Expand All @@ -53,6 +54,7 @@ public class SessionStateMachineTest
private final GraphDatabaseService db = mock( GraphDatabaseService.class );
private final ThreadToStatementContextBridge txBridge = mock( ThreadToStatementContextBridge.class );
private final Transaction tx = mock( TopLevelTransaction.class );
private final KernelTransaction ktx = mock( KernelTransaction.class );
private final UsageData usageData = new UsageData();
private final StatementRunner runner = mock( StatementRunner.class );
private final SessionStateMachine machine = new SessionStateMachine(
Expand Down Expand Up @@ -82,8 +84,8 @@ public void shouldRollbackOpenTransactionOnRollbackInducingError() throws Throwa

// Then
assertThat( machine.state(), CoreMatchers.equalTo( SessionStateMachine.State.ERROR ) );
verify( tx ).failure();
verify( tx ).close();
verify( ktx ).failure();
verify( ktx ).close();

// And when
machine.reset( null, Session.Callback.NO_OP );
Expand All @@ -109,8 +111,8 @@ public void shouldNotLeaveTransactionOpenOnClientErrors() throws Throwable

// Then
assertThat( machine.state(), equalTo( SessionStateMachine.State.ERROR ) );
verify(tx).failure();
verify(tx).close();
verify( ktx ).failure();
verify( ktx ).close();

// And when
machine.reset( null, Session.Callback.NO_OP );
Expand All @@ -130,7 +132,7 @@ public void shouldStopRunningTxOnHalt() throws Throwable
// Then
assertThat( machine.state(), CoreMatchers.equalTo( SessionStateMachine.State.STOPPED ) );
verify( db ).beginTx();
verify( tx ).close();
verify( ktx ).close();
}

@Test
Expand Down Expand Up @@ -236,6 +238,7 @@ public void shouldResetToIdleOnError() throws Throwable
public void setup()
{
when( db.beginTx() ).thenReturn( tx );
when( txBridge.getKernelTransactionBoundToThisThread( false ) ).thenReturn( ktx );
}

static class TestCallback<V> extends Session.Callback.Adapter<V, Object>
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.kernel.TopLevelTransaction;
import org.neo4j.kernel.impl.coreapi.TopLevelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.NullLogService;
Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.neo4j.cypher.internal.{CypherCompiler, _}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade
import org.neo4j.kernel.impl.query.{QueryEngineProvider, QueryExecutionMonitor, QuerySession}
Expand Down Expand Up @@ -251,7 +252,9 @@ class ExecutionEngine(graph: GraphDatabaseService, logProvider: LogProvider = Nu
}
optGraphAs[GraphDatabaseFacade]
.andThen(g => {
Option(g.platformModule.config.get(setting))
// TODO: Config should be passed in as a dependency to Cypher, not pulled out of casted interfaces
val config: Config = g.getDependencyResolver.resolveDependency(classOf[Config])
Option(config.get(setting))
})
.andThen(_.getOrElse(defaultValue))
.applyOrElse(graph, (_: GraphDatabaseService) => defaultValue)
Expand Down
Expand Up @@ -25,7 +25,9 @@ import org.neo4j.cypher.{InvalidArgumentException, SyntaxException, _}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.helpers.Clock
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.api.KernelAPI
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.{Log, LogProvider}
Expand Down Expand Up @@ -132,29 +134,30 @@ class CypherCompiler(graph: GraphDatabaseService,
}
}

private def getQueryCacheSize : Int =
optGraphAs[GraphDatabaseFacade]
.andThen(_.platformModule.config.get(GraphDatabaseSettings.query_cache_size).intValue())
.applyOrElse(graph, (_: GraphDatabaseService) => DEFAULT_QUERY_CACHE_SIZE)
private def getQueryCacheSize : Int = {
val setting: (Config) => Int = config => config.get(GraphDatabaseSettings.query_cache_size).intValue()
getSetting(graph, setting, DEFAULT_QUERY_CACHE_SIZE)
}


private def getStatisticsDivergenceThreshold : Double =
optGraphAs[GraphDatabaseFacade]
.andThen(_.platformModule.config.get(GraphDatabaseSettings.query_statistics_divergence_threshold).doubleValue())
.applyOrElse(graph, (_: GraphDatabaseService) => DEFAULT_STATISTICS_DIVERGENCE_THRESHOLD)
private def getStatisticsDivergenceThreshold : Double = {
val setting: (Config) => Double = config => config.get(GraphDatabaseSettings.query_statistics_divergence_threshold).doubleValue()
getSetting(graph, setting, DEFAULT_STATISTICS_DIVERGENCE_THRESHOLD)
}

private def getNonIndexedLabelWarningThreshold: Long =
optGraphAs[GraphDatabaseFacade]
.andThen(_.platformModule.config.get(GraphDatabaseSettings.query_non_indexed_label_warning_threshold).longValue())
.applyOrElse(graph, (_: GraphDatabaseService) => DEFAULT_NON_INDEXED_LABEL_WARNING_THRESHOLD)
private def getNonIndexedLabelWarningThreshold: Long = {
val setting: (Config) => Long = config => config.get(GraphDatabaseSettings.query_non_indexed_label_warning_threshold).longValue()
getSetting(graph, setting, DEFAULT_NON_INDEXED_LABEL_WARNING_THRESHOLD)
}

private def getMinimumTimeBeforeReplanning: Long = {
optGraphAs[GraphDatabaseFacade]
.andThen(_.platformModule.config.get(GraphDatabaseSettings.cypher_min_replan_interval).longValue())
.applyOrElse(graph, (_: GraphDatabaseService) => DEFAULT_QUERY_PLAN_TTL)
val setting: (Config) => Long = config => config.get(GraphDatabaseSettings.cypher_min_replan_interval).longValue()
getSetting(graph, setting, DEFAULT_QUERY_PLAN_TTL)
}

private def optGraphAs[T <: GraphDatabaseService : Manifest]: PartialFunction[GraphDatabaseService, T] = {
case (db: T) => db
private def getSetting[A](gds: GraphDatabaseService, configLookup: Config => A, default: A): A = gds match {
// TODO: Cypher should not be pulling out components from casted interfaces, it should ask for Config as a dep
case (gdbApi:GraphDatabaseAPI) => configLookup(gdbApi.getDependencyResolver.resolveDependency(classOf[Config]))
case _ => default
}
}
Expand Up @@ -33,7 +33,9 @@
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.TopLevelTransaction;
import org.neo4j.test.ImpermanentDatabaseRule;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -226,11 +228,11 @@ private void createNode()
}
}

private org.neo4j.kernel.TopLevelTransaction activeTransaction()
private TopLevelTransaction activeTransaction()
{
ThreadToStatementContextBridge bridge = db.getDependencyResolver().resolveDependency(
ThreadToStatementContextBridge.class );
return bridge.getTopLevelTransactionBoundToThisThread( false );
KernelTransaction kernelTransaction = bridge.getTopLevelTransactionBoundToThisThread( false );
return kernelTransaction == null ? null : new TopLevelTransaction( kernelTransaction, null );
}

}
Expand Up @@ -30,8 +30,11 @@ import org.neo4j.graphdb._
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.io.fs.FileUtils
import org.neo4j.kernel.{NeoStoreDataSource, TopLevelTransaction}
import org.neo4j.kernel.{NeoStoreDataSource}
import org.neo4j.test.TestGraphDatabaseFactory
import org.neo4j.kernel.NeoStoreDataSource
import org.neo4j.kernel.impl.coreapi.TopLevelTransaction
import org.neo4j.test.{ImpermanentGraphDatabase, TestGraphDatabaseFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down
Expand Up @@ -42,6 +42,7 @@ import org.neo4j.graphdb._
import org.neo4j.helpers.collection.Iterables.asResourceIterable
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.api.{ReadOperations, Statement}
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.api.OperationsFacade
import org.neo4j.kernel.impl.core.{NodeManager, NodeProxy, ThreadToStatementContextBridge}
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore
Expand Down Expand Up @@ -190,6 +191,7 @@ class LazyTest extends ExecutionEngineFunSuite {
val dependencies = mock[DependencyResolver]
val bridge = mock[ThreadToStatementContextBridge]
val monitors = new org.neo4j.kernel.monitoring.Monitors()
val config = new Config()

val fakeDataStatement = mock[OperationsFacade]
val fakeReadStatement = mock[ReadOperations]
Expand All @@ -208,6 +210,7 @@ class LazyTest extends ExecutionEngineFunSuite {
when(dependencies.resolveDependency(classOf[NodeManager])).thenReturn(nodeManager)
when(dependencies.resolveDependency(classOf[TransactionIdStore])).thenReturn(idStore)
when(dependencies.resolveDependency(classOf[org.neo4j.kernel.monitoring.Monitors])).thenReturn(monitors)
when(dependencies.resolveDependency(classOf[Config])).thenReturn(config)
when(fakeGraph.beginTx()).thenReturn(tx)
val n0 = mock[Node]
val n1 = mock[Node]
Expand Down
Expand Up @@ -129,7 +129,7 @@ private void resolveMonitor( Node node )
GraphDatabaseService service = node.getGraphDatabase();
if ( service instanceof GraphDatabaseFacade )
{
Monitors monitors = ((GraphDatabaseFacade) service).platformModule.monitors;
Monitors monitors = ((GraphDatabaseFacade) service).getDependencyResolver().resolveDependency( Monitors.class );
dataMonitor = monitors.newMonitor( DataMonitor.class );
}
}
Expand Down
Expand Up @@ -47,6 +47,7 @@
import static org.neo4j.kernel.configuration.Settings.NO_DEFAULT;
import static org.neo4j.kernel.configuration.Settings.PATH;
import static org.neo4j.kernel.configuration.Settings.STRING;
import static org.neo4j.kernel.configuration.Settings.STRING_LIST;
import static org.neo4j.kernel.configuration.Settings.TRUE;
import static org.neo4j.kernel.configuration.Settings.illegalValueMessage;
import static org.neo4j.kernel.configuration.Settings.list;
Expand Down Expand Up @@ -200,7 +201,7 @@ public abstract class GraphDatabaseSettings
"only.")
@Internal
@Deprecated
public static final Setting<String> node_keys_indexable = setting("node_keys_indexable", STRING, NO_DEFAULT, illegalValueMessage( "must be a comma-separated list of keys to be indexed", matches( ANY ) ) );
public static final Setting<List<String>> node_keys_indexable = setting("node_keys_indexable", STRING_LIST, "" );

@Description("Controls the auto indexing feature for relationships. Setting it to `false` shuts it down, " +
"while `true` enables it by default for properties "
Expand All @@ -214,7 +215,7 @@ public abstract class GraphDatabaseSettings
"_relationships_ only." )
@Internal
@Deprecated
public static final Setting<String> relationship_keys_indexable = setting("relationship_keys_indexable", STRING, NO_DEFAULT, illegalValueMessage( "must be a comma-separated list of keys to be indexed", matches( ANY ) ) );
public static final Setting<List<String>> relationship_keys_indexable = setting("relationship_keys_indexable", STRING_LIST, "" );

// Index sampling
@Description("Enable or disable background index sampling")
Expand Down
Expand Up @@ -206,7 +206,7 @@ public void notify( AvailabilityListener listener )
}
}

private static enum Availability
private enum Availability
{
AVAILABLE,
UNAVAILABLE,
Expand All @@ -223,6 +223,15 @@ public boolean isAvailable()
return availability() == Availability.AVAILABLE;
}


/**
* Check if the database has been shut down.
*/
public boolean isShutdown()
{
return availability() == Availability.SHUTDOWN;
}

/**
* Check if the database is available for transactions to use.
*
Expand Down

0 comments on commit 7847506

Please sign in to comment.