Skip to content

Commit

Permalink
Simplify DatabaseSchemaState hierarchy, collect label ids during plan…
Browse files Browse the repository at this point in the history
…ning.
  • Loading branch information
MishaDemianenko committed Jul 17, 2017
1 parent 8a8f47d commit 448f146
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 92 deletions.
Expand Up @@ -137,11 +137,11 @@ abstract class LogicalPlan
import org.neo4j.cypher.internal.frontend.v3_3.Foldable._
this.fold(Seq.empty[IndexUsage]) {
case NodeIndexSeek(idName, label, propertyKeys, _, _) =>
(acc) => acc :+ SchemaIndexSeekUsage(idName.name, label.name, propertyKeys.map(_.name))
(acc) => acc :+ SchemaIndexSeekUsage(idName.name, label.nameId.id, label.name, propertyKeys.map(_.name))
case NodeUniqueIndexSeek(idName, label, propertyKeys, _, _) =>
(acc) => acc :+ SchemaIndexSeekUsage(idName.name, label.name, propertyKeys.map(_.name))
(acc) => acc :+ SchemaIndexSeekUsage(idName.name, label.nameId.id, label.name, propertyKeys.map(_.name))
case NodeIndexScan(idName, label, propertyKey, _) =>
(acc) => acc :+ SchemaIndexScanUsage(idName.name, label.name, propertyKey.name)
(acc) => acc :+ SchemaIndexScanUsage(idName.name, label.nameId.id, label.name, propertyKey.name)
}
}
}
Expand Down Expand Up @@ -172,7 +172,7 @@ sealed trait IndexUsage {
def identifier:String
}

final case class SchemaIndexSeekUsage(identifier: String, label: String, propertyKeys: Seq[String]) extends IndexUsage
final case class SchemaIndexScanUsage(identifier: String, label: String, propertyKey: String) extends IndexUsage
final case class SchemaIndexSeekUsage(identifier: String, labelId : Int, label: String, propertyKeys: Seq[String]) extends IndexUsage
final case class SchemaIndexScanUsage(identifier: String, labelId : Int, label: String, propertyKey: String) extends IndexUsage
final case class LegacyNodeIndexUsage(identifier: String, index: String) extends IndexUsage
final case class LegacyRelationshipIndexUsage(identifier: String, index: String) extends IndexUsage
Expand Up @@ -30,13 +30,17 @@ import org.neo4j.cypher.internal.spi.v3_3.TransactionalContextWrapper
import org.neo4j.cypher.internal.tracing.{CompilationTracer, TimingCompilationTracer}
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.api.query.SchemaIndexUsage
import org.neo4j.kernel.api.security.AccessMode
import org.neo4j.kernel.api.{KernelAPI, ReadOperations}
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.locking.ResourceTypes
import org.neo4j.kernel.impl.query.{QueryExecutionMonitor, TransactionalContext}
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.kernel.{GraphDatabaseQueryService, api}
import org.neo4j.logging.{LogProvider, NullLogProvider}

import scala.collection.mutable
trait StringCacheMonitor extends CypherCacheMonitor[String, api.Statement]

/**
Expand Down Expand Up @@ -149,33 +153,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
// create transaction and query context
val tc = externalTransactionalContext.getOrBeginNewIfClosed()

// Temporarily change access mode during query planning
// NOTE: This will force read access mode if the current transaction did not have it
val revertable = tc.restrictCurrentTransaction(tc.securityContext.withMode(AccessMode.Static.READ))

val ((plan: ExecutionPlan, extractedParameters), touched) = try {
// fetch plan cache
val cache = getOrCreateFromSchemaState(tc.readOperations, {
cacheMonitor.cacheFlushDetected(tc.statement)
val lruCache = new LFUCache[String, (ExecutionPlan, Map[String, Any])](getPlanCacheSize)
new QueryCache(cacheAccessor, lruCache)
})

def isStale(plan: ExecutionPlan, ignored: Map[String, Any]) = plan.isStale(lastCommittedTxId, tc)
def producePlan() = {
val parsedQuery = parsePreParsedQuery(preParsedQuery, phaseTracer)
parsedQuery.plan(tc, phaseTracer)
}

cache.getOrElseUpdate(cacheKey, queryText, (isStale _).tupled, producePlan())
}
catch {
case (t: Throwable) =>
tc.close(success = false)
throw t
} finally {
revertable.close()
}
val (plan: ExecutionPlan, extractedParameters: Map[String, Any], touched: Boolean) = planQuery(queryText, phaseTracer, preParsedQuery, cacheKey, tc)

if (touched) {
tc.close(success = true)
Expand All @@ -192,6 +170,57 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
throw new IllegalStateException("Could not execute query due to insanely frequent schema changes")
}

private def planQuery(queryText: String, phaseTracer: CompilationTracer.QueryCompilationEvent, preParsedQuery: PreParsedQuery, cacheKey: String, tc: TransactionalContextWrapper) = {
// Temporarily change access mode during query planning
// NOTE: This will force read access mode if the current transaction did not have it
val revertable = tc.restrictCurrentTransaction(tc.securityContext.withMode(AccessMode.Static.READ))

val ((plan: ExecutionPlan, extractedParameters), touched) = try {
// fetch plan cache
val cache = getOrCreateFromSchemaState(tc.readOperations, {
cacheMonitor.cacheFlushDetected(tc.statement)
val lruCache = new LFUCache[String, (ExecutionPlan, Map[String, Any])](getPlanCacheSize)
new QueryCache(cacheAccessor, lruCache)
})

def isStale(plan: ExecutionPlan, ignored: Map[String, Any]) = plan.isStale(lastCommittedTxId, tc)

def producePlan() = {
import scala.collection.JavaConverters._

val readOperations = tc.statement.readOperations()
var replan : Boolean = true
var plan: ExecutionPlan = null
var map: Map[String, Any] = null
var labelIds : mutable.Seq[Int] = null
while (replan) {
val parsedQuery = parsePreParsedQuery(preParsedQuery, phaseTracer)
val tuple = parsedQuery.plan(tc, phaseTracer)
plan = tuple._1
map = tuple._2

labelIds = plan.plannerInfo.indexes().asScala.collect { case item: SchemaIndexUsage => item.getLabelId }
labelIds.foreach { readOperations.acquireShared(ResourceTypes.LABEL, _) }
// replan = !cache.isValid
if (replan) {
labelIds.foreach { readOperations.releaseShared(ResourceTypes.LABEL, _) }
}
}
(plan, map)
}

cache.getOrElseUpdate(cacheKey, queryText, (isStale _).tupled, producePlan())
}
catch {
case (t: Throwable) =>
tc.close(success = false)
throw t
} finally {
revertable.close()
}
(plan, extractedParameters, touched)
}

private def getOrCreateFromSchemaState[V](operations: ReadOperations, creator: => V) = {
val javaCreator = new java.util.function.Function[ExecutionEngine, V]() {
def apply(key: ExecutionEngine) = creator
Expand Down
Expand Up @@ -79,14 +79,14 @@ trait Compatibility[C <: CompilerContext] {
// Log notifications/warnings from planning
planImpl.notifications(planContext).foreach(notificationLogger.log)

(new ExecutionPlanWrapper(planImpl, preParsingNotifications), extractedParameters)
(new ExecutionPlanWrapper(planImpl, transactionalContext, preParsingNotifications ), extractedParameters)
}

override protected val trier: Try[BaseState] = preparedSyntacticQueryForV_3_2
}
}

class ExecutionPlanWrapper(inner: ExecutionPlan_v3_2, preParsingNotifications: Set[org.neo4j.graphdb.Notification])
class ExecutionPlanWrapper(inner: ExecutionPlan_v3_2, transactionalContext: TransactionalContextWrapperV3_3, preParsingNotifications: Set[org.neo4j.graphdb.Notification])
extends ExecutionPlan {

private val searchMonitor = kernelMonitors.newMonitor(classOf[IndexSearchMonitor])
Expand Down Expand Up @@ -121,8 +121,12 @@ trait Compatibility[C <: CompilerContext] {
override def plannerInfo: PlannerInfo = {
import scala.collection.JavaConverters._
new PlannerInfo(inner.plannerUsed.name, inner.runtimeUsed.name, inner.plannedIndexUsage.map {
case SchemaIndexSeekUsage(identifier, label, propertyKeys) => schemaIndexUsage(identifier, label, propertyKeys: _*)
case SchemaIndexScanUsage(identifier, label, propertyKey) => schemaIndexUsage(identifier, label, propertyKey)
case SchemaIndexSeekUsage(identifier, label, propertyKeys) =>
val labelId = transactionalContext.readOperations.labelGetForName(label)
schemaIndexUsage(identifier, labelId, label, propertyKeys: _*)
case SchemaIndexScanUsage(identifier, label, propertyKey) =>
val labelId = transactionalContext.readOperations.labelGetForName(label)
schemaIndexUsage(identifier, labelId, label, propertyKey)
case LegacyNodeIndexUsage(identifier, index) => legacyIndexUsage(identifier, "NODE", index)
case LegacyRelationshipIndexUsage(identifier, index) => legacyIndexUsage(identifier, "RELATIONSHIP", index)
}.asJava)
Expand Down
Expand Up @@ -188,8 +188,8 @@ trait Compatibility[CONTEXT <: CommunityRuntimeContext,

override def plannerInfo: PlannerInfo = {
new PlannerInfo(inner.plannerUsed.name, inner.runtimeUsed.name, inner.plannedIndexUsage.map {
case SchemaIndexSeekUsage(identifier, label, propertyKeys) => schemaIndexUsage(identifier, label, propertyKeys: _*)
case SchemaIndexScanUsage(identifier, label, propertyKey) => schemaIndexUsage(identifier, label, propertyKey)
case SchemaIndexSeekUsage(identifier, labelId, label, propertyKeys) => schemaIndexUsage(identifier, labelId, label, propertyKeys: _*)
case SchemaIndexScanUsage(identifier, labelId, label, propertyKey) => schemaIndexUsage(identifier, labelId, label, propertyKey)
case LegacyNodeIndexUsage(identifier, index) => legacyIndexUsage(identifier, "NODE", index)
case LegacyRelationshipIndexUsage(identifier, index) => legacyIndexUsage(identifier, "RELATIONSHIP", index)
}.asJava)
Expand Down
Expand Up @@ -52,9 +52,9 @@
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.ConstraintEnforcingEntityOperations;
import org.neo4j.kernel.impl.api.DataIntegrityValidatingStatementOperations;
import org.neo4j.kernel.impl.api.DatabaseSchemaState;
import org.neo4j.kernel.impl.api.GuardingStatementOperations;
import org.neo4j.kernel.impl.api.Kernel;
import org.neo4j.kernel.impl.api.KernelSchemaStateStore;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
import org.neo4j.kernel.impl.api.LegacyIndexProviderLookup;
Expand All @@ -68,7 +68,6 @@
import org.neo4j.kernel.impl.api.StatementOperationParts;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.UpdateableSchemaState;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.operations.QueryRegistrationOperations;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
Expand Down Expand Up @@ -431,7 +430,7 @@ public void start() throws IOException
StorageEngine storageEngine = null;
try
{
UpdateableSchemaState updateableSchemaState = new KernelSchemaStateStore( logProvider );
DatabaseSchemaState databaseSchemaState = new DatabaseSchemaState( logProvider );

SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering = new SynchronizedArrayIdOrderingQueue( 20 );

Expand All @@ -440,7 +439,7 @@ public void start() throws IOException

storageEngine = buildStorageEngine(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup,
indexConfigStore, updateableSchemaState, legacyIndexTransactionOrdering );
indexConfigStore, databaseSchemaState, legacyIndexTransactionOrdering );

LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory(), STRICT );
Expand Down Expand Up @@ -469,7 +468,7 @@ public void start() throws IOException
transactionLogModule.transactionAppender(),
dependencies.resolveDependency( IndexingService.class ),
storageEngine.storeReadLayer(),
updateableSchemaState,
databaseSchemaState,
dependencies.resolveDependency( LabelScanStore.class ),
storageEngine,
indexConfigStore,
Expand All @@ -486,7 +485,7 @@ public void start() throws IOException
this.kernelModule = kernelModule;

dependencies.satisfyDependency( this );
dependencies.satisfyDependency( updateableSchemaState );
dependencies.satisfyDependency( databaseSchemaState );
dependencies.satisfyDependency( storageEngine.storeReadLayer() );
dependencies.satisfyDependency( logEntryReader );
dependencies.satisfyDependency( storageEngine );
Expand Down Expand Up @@ -708,7 +707,7 @@ public void recoveryCompleted( int numberOfRecoveredTransactions )
private NeoStoreKernelModule buildKernel( TransactionAppender appender,
IndexingService indexingService,
StoreReadLayer storeLayer,
UpdateableSchemaState updateableSchemaState, LabelScanStore labelScanStore,
DatabaseSchemaState databaseSchemaState, LabelScanStore labelScanStore,
StorageEngine storageEngine,
IndexConfigStore indexConfigStore,
TransactionIdStore transactionIdStore,
Expand All @@ -733,7 +732,7 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,

StatementOperationContainer statementOperationContainer = dependencies.satisfyDependency(
buildStatementOperations( storeLayer, autoIndexing,
constraintIndexCreator, updateableSchemaState, guard, legacyIndexStore ) );
constraintIndexCreator, databaseSchemaState, guard, legacyIndexStore ) );

TransactionHooks hooks = new TransactionHooks();
KernelTransactions kernelTransactions = life.add( new KernelTransactions( statementLocksFactory,
Expand Down Expand Up @@ -848,7 +847,7 @@ public DependencyResolver getDependencyResolver()

private StatementOperationContainer buildStatementOperations(
StoreReadLayer storeReadLayer, AutoIndexing autoIndexing,
ConstraintIndexCreator constraintIndexCreator, UpdateableSchemaState updateableSchemaState,
ConstraintIndexCreator constraintIndexCreator, DatabaseSchemaState databaseSchemaState,
Guard guard, LegacyIndexStore legacyIndexStore )
{
// The passed in StoreReadLayer is the bottom most layer: Read-access to committed data.
Expand All @@ -863,7 +862,7 @@ private StatementOperationContainer buildStatementOperations(

StatementOperationParts parts = new StatementOperationParts( stateHandlingContext, stateHandlingContext,
stateHandlingContext, stateHandlingContext, stateHandlingContext, stateHandlingContext,
new SchemaStateConcern( updateableSchemaState ), null, stateHandlingContext, stateHandlingContext,
new SchemaStateConcern( databaseSchemaState ), null, stateHandlingContext, stateHandlingContext,
stateHandlingContext, queryRegistrationOperations );
// + Constraints
ConstraintEnforcingEntityOperations constraintEnforcingEntityOperations =
Expand Down
Expand Up @@ -23,9 +23,9 @@

public abstract class IndexUsage
{
public static IndexUsage schemaIndexUsage( String identifier, String label, String... propertyKeys )
public static IndexUsage schemaIndexUsage( String identifier, int labelId, String label, String... propertyKeys )
{
return new SchemaIndexUsage( identifier, label, propertyKeys );
return new SchemaIndexUsage( identifier, labelId, label, propertyKeys );
}

public static IndexUsage legacyIndexUsage( String identifier, String entityType, String index )
Expand Down
Expand Up @@ -22,25 +22,33 @@
import java.util.HashMap;
import java.util.Map;

class SchemaIndexUsage extends IndexUsage
public class SchemaIndexUsage extends IndexUsage
{
private final String label;
private final String[] propertyKeys;
private final int labelId;

SchemaIndexUsage( String identifier, String label, String[] propertyKeys )
SchemaIndexUsage( String identifier, int labelId, String label, String[] propertyKeys )
{
super( identifier );
this.label = label;
this.labelId = labelId;
this.propertyKeys = propertyKeys;
}

public int getLabelId()
{
return labelId;
}

public Map<String,String> asMap()
{
Map<String,String> map = new HashMap<>();
map.put( "indexType", "SCHEMA INDEX" );
map.put( "entityType", "NODE" );
map.put( "identifier", identifier );
map.put( "label", label );
map.put( "labelId", String.valueOf( labelId ) );
for ( int i = 0; i < propertyKeys.length; i++ )
{
String key = (propertyKeys.length > 1) ? "propertyKey_" + i : "propertyKey";
Expand Down
Expand Up @@ -33,14 +33,14 @@
* Schema state is transient state that should be invalidated when the schema changes.
* Examples of things stored in schema state is execution plans for cypher.
*/
public class KernelSchemaStateStore implements UpdateableSchemaState
public class DatabaseSchemaState implements SchemaState
{
private Map<Object, Object> state;

private final Log log;
private final ReadWriteLock lock = new ReentrantReadWriteLock( true );

public KernelSchemaStateStore( LogProvider logProvider )
public DatabaseSchemaState( LogProvider logProvider )
{
this.state = new HashMap<>( );
this.log = logProvider.getLog( getClass() );
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.api;

import java.util.Map;
import java.util.function.Function;

public interface SchemaState
Expand All @@ -27,5 +28,9 @@ public interface SchemaState

<K, V> V getOrCreate( K key, Function<K, V> creator );

void replace( Map<Object, Object> updates );

<K, V> void apply( Map<K, V> updates );

void clear();
}
Expand Up @@ -25,9 +25,9 @@

public class SchemaStateConcern implements SchemaStateOperations
{
private final UpdateableSchemaState schemaState;
private final SchemaState schemaState;

public SchemaStateConcern( UpdateableSchemaState schemaState )
public SchemaStateConcern( SchemaState schemaState )
{
this.schemaState = schemaState;
}
Expand Down

This file was deleted.

0 comments on commit 448f146

Please sign in to comment.