Skip to content

Commit

Permalink
moves RuntimeEnvironment up to compiler factory
Browse files Browse the repository at this point in the history
  • Loading branch information
alexaverbuch committed Aug 14, 2018
1 parent 0f64de1 commit 1de9adc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package org.neo4j.cypher.internal

import java.time.Clock

import org.neo4j.cypher.internal.MorselRuntime.MorselRuntimeState
import org.neo4j.cypher.internal.compatibility.v3_4.Cypher34Planner
import org.neo4j.cypher.internal.compatibility.v3_5.Cypher35Planner
import org.neo4j.cypher.internal.compatibility.{CypherCurrentCompiler, CypherPlanner, CypherRuntimeConfiguration, RuntimeContext, RuntimeContextCreator}
Expand All @@ -33,12 +32,14 @@ import org.neo4j.cypher.internal.executionplan.GeneratedQuery
import org.neo4j.cypher.internal.planner.v3_5.spi.TokenContext
import org.neo4j.cypher.internal.runtime.compiled.codegen.spi.CodeStructure
import org.neo4j.cypher.internal.runtime.interpreted.LastCommittedTxIdProvider
import org.neo4j.cypher.internal.runtime.parallel._
import org.neo4j.cypher.internal.runtime.vectorized.Dispatcher
import org.neo4j.cypher.internal.spi.codegen.GeneratedQueryStructure
import org.neo4j.cypher.{CypherPlannerOption, CypherRuntimeOption, CypherUpdateStrategy, CypherVersion}
import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.{Log, LogProvider}
import org.neo4j.scheduler.JobScheduler
import org.neo4j.scheduler.{Group, JobScheduler}
import org.opencypher.v9_0.frontend.phases.InternalNotificationLogger

class EnterpriseCompilerFactory(community: CommunityCompilerFactory,
Expand All @@ -53,7 +54,7 @@ class EnterpriseCompilerFactory(community: CommunityCompilerFactory,
Each compiler contains a runtime instance, and each morsel runtime instance requires a dispatcher instance.
This ensures only one (shared) dispatcher/tracer instance is created, even when there are multiple morsel runtime instances.
*/
private val morselRuntimeState = MorselRuntimeState(runtimeConfig, graph.getDependencyResolver.resolveDependency(classOf[JobScheduler]))
private val runtimeEnvironment = RuntimeEnvironment(runtimeConfig, graph.getDependencyResolver.resolveDependency(classOf[JobScheduler]))

override def createCompiler(cypherVersion: CypherVersion,
cypherPlanner: CypherPlannerOption,
Expand Down Expand Up @@ -89,14 +90,47 @@ class EnterpriseCompilerFactory(community: CommunityCompilerFactory,
CypherCurrentCompiler(
planner,
EnterpriseRuntimeFactory.getRuntime(cypherRuntime, plannerConfig.useErrorsOverWarnings),
EnterpriseRuntimeContextCreator(GeneratedQueryStructure, log, plannerConfig, morselRuntimeState),
EnterpriseRuntimeContextCreator(GeneratedQueryStructure, log, plannerConfig, runtimeEnvironment),
kernelMonitors)

} else
community.createCompiler(cypherVersion, cypherPlanner, cypherRuntime, cypherUpdateStrategy)
}
}

case class RuntimeEnvironment(config:CypherRuntimeConfiguration, jobScheduler: JobScheduler) {
private val dispatcher: Dispatcher = createDispatcher()
val tracer: SchedulerTracer = createTracer()

def getDispatcher(debugOptions: Set[String]): Dispatcher =
if (singleThreadedRequested(debugOptions) && !isAlreadySingleThreaded)
new Dispatcher(config.morselSize, new SingleThreadScheduler())
else
dispatcher

private def singleThreadedRequested(debugOptions: Set[String]) = debugOptions.contains("singlethreaded")

private def isAlreadySingleThreaded = config.workers == 1

private def createDispatcher(): Dispatcher = {
val scheduler =
if (config.workers == 1) new SingleThreadScheduler()
else {
val numberOfThreads = if (config.workers == 0) java.lang.Runtime.getRuntime.availableProcessors() else config.workers
val executorService = jobScheduler.workStealingExecutor(Group.CYPHER_WORKER, numberOfThreads)
new SimpleScheduler(executorService)
}
new Dispatcher(config.morselSize, scheduler)
}

private def createTracer(): SchedulerTracer = {
if (config.doSchedulerTracing)
new DataPointSchedulerTracer(new ThreadSafeDataWriter(new CsvStdOutDataWriter))
else
SchedulerTracer.NoSchedulerTracer
}
}

/**
* Enterprise runtime context. Enriches the community runtime context with infrastructure needed for
* query compilation and parallel execution.
Expand All @@ -109,15 +143,15 @@ case class EnterpriseRuntimeContext(notificationLogger: InternalNotificationLogg
clock: Clock,
debugOptions: Set[String],
config: CypherPlannerConfiguration,
morselRuntimeState: MorselRuntimeState) extends RuntimeContext
morselRuntimeState: RuntimeEnvironment) extends RuntimeContext

/**
* Creator of EnterpriseRuntimeContext
*/
case class EnterpriseRuntimeContextCreator(codeStructure: CodeStructure[GeneratedQuery],
log: Log,
config: CypherPlannerConfiguration,
morselRuntimeState: MorselRuntimeState)
morselRuntimeState: RuntimeEnvironment)
extends RuntimeContextCreator[EnterpriseRuntimeContext] {

override def create(notificationLogger: InternalNotificationLogger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
package org.neo4j.cypher.internal

import org.neo4j.cypher.internal.compatibility.{CypherRuntime, CypherRuntimeConfiguration}
import org.neo4j.cypher.internal.compatibility.CypherRuntime
import org.neo4j.cypher.internal.compatibility.v3_5.runtime.PhysicalPlanningAttributes.SlotConfigurations
import org.neo4j.cypher.internal.compatibility.v3_5.runtime.SlotAllocation.PhysicalPlan
import org.neo4j.cypher.internal.compatibility.v3_5.runtime._
Expand All @@ -43,7 +43,6 @@ import org.neo4j.cypher.internal.runtime.vectorized.{Dispatcher, Pipeline, Pipel
import org.neo4j.cypher.internal.v3_5.logical.plans.LogicalPlan
import org.neo4j.cypher.result.QueryResult.QueryResultVisitor
import org.neo4j.graphdb.Notification
import org.neo4j.scheduler.{Group, JobScheduler}
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.ast.semantics.SemanticTable
import org.opencypher.v9_0.frontend.PlannerName
Expand Down Expand Up @@ -126,39 +125,6 @@ object MorselRuntime extends CypherRuntime[EnterpriseRuntimeContext] {
override def runtimeName: RuntimeName = MorselRuntimeName
}

case class MorselRuntimeState(config:CypherRuntimeConfiguration, jobScheduler: JobScheduler) {
private val dispatcher: Dispatcher = createDispatcher()
val tracer: SchedulerTracer = createTracer()

def getDispatcher(debugOptions: Set[String]): Dispatcher =
if (singleThreadedRequested(debugOptions) && !isAlreadySingleThreaded)
new Dispatcher(config.morselSize, new SingleThreadScheduler())
else
dispatcher

private def singleThreadedRequested(debugOptions: Set[String]) = debugOptions.contains("singlethreaded")

private def isAlreadySingleThreaded = config.workers == 1

private def createDispatcher(): Dispatcher = {
val scheduler =
if (config.workers == 1) new SingleThreadScheduler()
else {
val numberOfThreads = if (config.workers == 0) java.lang.Runtime.getRuntime.availableProcessors() else config.workers
val executorService = jobScheduler.workStealingExecutor(Group.CYPHER_WORKER, numberOfThreads)
new SimpleScheduler(executorService)
}
new Dispatcher(config.morselSize, scheduler)
}

private def createTracer(): SchedulerTracer = {
if (config.doSchedulerTracing)
new DataPointSchedulerTracer(new ThreadSafeDataWriter(new CsvStdOutDataWriter))
else
SchedulerTracer.NoSchedulerTracer
}
}

class VectorizedOperatorExecutionResult(operators: Pipeline,
logicalPlan: LogicalPlan,
executionPlanBuilder: () => InternalPlanDescription,
Expand Down

0 comments on commit 1de9adc

Please sign in to comment.