Skip to content

Commit

Permalink
Monitor CSV resources, and close them...
Browse files Browse the repository at this point in the history
This was not performed as they were added to a random extra TaskCloser,
which was completely unknown to the ClosingExecutionContext. Now CSV
resources are added to the QueryContexts ResourceManager, which is already
registered with the correct (and only) TaskCloser.
  • Loading branch information
fickludd committed Sep 5, 2018
1 parent a7f65c0 commit 1c0bce0
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,12 @@ case class CypherCurrentCompiler[CONTEXT <: RuntimeContext](planner: CypherPlann
queryType: InternalQueryType) extends ExecutableQuery {

private val searchMonitor = kernelMonitors.newMonitor(classOf[IndexSearchMonitor])
private val resourceMonitor = kernelMonitors.newMonitor(classOf[ResourceMonitor])

private def getQueryContext(transactionalContext: TransactionalContext) = {
val ctx = new TransactionBoundQueryContext(TransactionalContextWrapper(transactionalContext))(searchMonitor)
val ctx = new TransactionBoundQueryContext(TransactionalContextWrapper(transactionalContext),
new ResourceManager(resourceMonitor)
)(searchMonitor)
new ExceptionTranslatingQueryContext(ctx)
}

Expand All @@ -170,17 +173,16 @@ case class CypherCurrentCompiler[CONTEXT <: RuntimeContext](planner: CypherPlann
case CypherExecutionMode.profile => ProfileMode
case CypherExecutionMode.normal => NormalMode
}
val taskCloser = new TaskCloser
runSafely {

val queryContext = getQueryContext(transactionalContext)
taskCloser.addTask(queryContext.transactionalContext.close)
taskCloser.addTask(queryContext.resources.close)

val planDescriptionBuilder =
new PlanDescriptionBuilder(logicalPlan, plannerName, readOnly, cardinalities, executionPlan.runtimeName, executionPlan.metadata)

val taskCloser = new TaskCloser
taskCloser.addTask(queryContext.transactionalContext.close)
taskCloser.addTask(queryContext.resources.close)

val internalExecutionResult =
if (innerExecutionMode == ExplainMode) {
taskCloser.close(success = true)
Expand Down Expand Up @@ -213,7 +215,7 @@ case class CypherCurrentCompiler[CONTEXT <: RuntimeContext](planner: CypherPlann
kernelMonitors.newMonitor(classOf[QueryExecutionMonitor])
)
)
}
} (e => taskCloser.close(false))
}

def reusabilityState(lastCommittedTxId: () => Long, ctx: TransactionalContext): ReusabilityState = reusabilityState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,10 @@ object InterpretedRuntime extends CypherRuntime[RuntimeContext] {
readOnly: Boolean) extends ExecutionPlan {

override def run(queryContext: QueryContext, doProfile: Boolean, params: MapValue): RuntimeResult = {
val builder = resultBuilderFactory.create()

val profileInformation = new InterpretedProfileInformation
val builderContext = if (!readOnly || doProfile) new UpdateCountingQueryContext(queryContext) else queryContext
val builder = resultBuilderFactory.create(builderContext)

builder.setQueryContext(builderContext)
val profileInformation = new InterpretedProfileInformation

if (periodicCommit.isDefined) {
if (!builderContext.transactionalContext.isTopLevelTx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ExceptionTranslatingQueryContext(val inner: QueryContext) extends QueryCon

override def withActiveRead: QueryContext = inner.withActiveRead

override def resources: CloseableResource = inner.resources
override def resources: ResourceManager = inner.resources

override def transactionalContext =
new ExceptionTranslatingTransactionalContext(inner.transactionalContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.neo4j.cypher.internal.v3_5.logical.plans.LogicalPlan
import org.neo4j.cypher.result.{QueryProfile, RuntimeResult}
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.frontend.phases.InternalNotificationLogger
import org.opencypher.v9_0.util.{CypherException, TaskCloser}
import org.opencypher.v9_0.util.CypherException

import scala.collection.mutable

Expand All @@ -36,63 +36,38 @@ abstract class BaseExecutionResultBuilderFactory(pipe: Pipe,
columns: List[String],
logicalPlan: LogicalPlan) extends ExecutionResultBuilderFactory {
abstract class BaseExecutionWorkflowBuilder() extends ExecutionResultBuilder {
protected val taskCloser = new TaskCloser
protected var externalResource: ExternalCSVResource = new CSVResources(taskCloser)
protected var maybeQueryContext: Option[QueryContext] = None
protected var externalResource: ExternalCSVResource = new CSVResources(queryContext.resources)
protected var pipeDecorator: PipeDecorator = NullPipeDecorator
protected var exceptionDecorator: CypherException => CypherException = identity

protected def createQueryState(params: MapValue): QueryState

def setQueryContext(context: QueryContext) {
maybeQueryContext = Some(context)
}
def queryContext: QueryContext

def setLoadCsvPeriodicCommitObserver(batchRowCount: Long) {
def setLoadCsvPeriodicCommitObserver(batchRowCount: Long): Unit = {
val observer = new LoadCsvPeriodicCommitObserver(batchRowCount, externalResource, queryContext)
externalResource = observer
setExceptionDecorator(observer)
exceptionDecorator = observer
}

def setPipeDecorator(newDecorator: PipeDecorator) {
def setPipeDecorator(newDecorator: PipeDecorator): Unit =
pipeDecorator = newDecorator
}

def setExceptionDecorator(newDecorator: CypherException => CypherException) {
exceptionDecorator = newDecorator
}

override def build(params: MapValue,
notificationLogger: InternalNotificationLogger,
readOnly: Boolean,
queryProfile: QueryProfile): RuntimeResult = {
taskCloser.addTask(queryContext.transactionalContext.close)
taskCloser.addTask(queryContext.resources.close)
val state = createQueryState(params)
try {
createResults(state, notificationLogger, readOnly, queryProfile)
}
catch {
val results = pipe.createResults(state)
val resultIterator = buildResultIterator(results, readOnly)
new PipeExecutionResult(resultIterator, columns.toArray, state, queryProfile)
} catch {
case e: CypherException =>
taskCloser.close(success = false)
throw exceptionDecorator(e)
case t: Throwable =>
taskCloser.close(success = false)
throw t
}
}

private def createResults(state: QueryState,
notificationLogger: InternalNotificationLogger,
readOnly: Boolean,
queryProfile: QueryProfile): RuntimeResult = {
val results = pipe.createResults(state)
val resultIterator = buildResultIterator(results, readOnly)
new PipeExecutionResult(resultIterator, columns.toArray, state, queryProfile)
}

protected def queryContext: QueryContext = maybeQueryContext.get

protected def buildResultIterator(results: Iterator[ExecutionContext], readOnly: Boolean): IteratorBasedResult
}
}
Expand All @@ -104,9 +79,9 @@ case class InterpretedExecutionResultBuilderFactory(pipe: Pipe,
lenientCreateRelationship: Boolean)
extends BaseExecutionResultBuilderFactory(pipe, readOnly, columns, logicalPlan) {

override def create(): ExecutionResultBuilder = InterpretedExecutionWorkflowBuilder()
override def create(queryContext: QueryContext): ExecutionResultBuilder = InterpretedExecutionWorkflowBuilder(queryContext: QueryContext)

case class InterpretedExecutionWorkflowBuilder() extends BaseExecutionWorkflowBuilder {
case class InterpretedExecutionWorkflowBuilder(queryContext: QueryContext) extends BaseExecutionWorkflowBuilder {
override def createQueryState(params: MapValue): QueryState = {
new QueryState(queryContext,
externalResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@ import org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeDecorator
import org.neo4j.cypher.result.{QueryProfile, RuntimeResult}
import org.neo4j.values.virtual.MapValue
import org.opencypher.v9_0.frontend.phases.InternalNotificationLogger
import org.opencypher.v9_0.util.CypherException

trait ExecutionResultBuilder {
def setQueryContext(context: QueryContext)
def setLoadCsvPeriodicCommitObserver(batchRowCount: Long)
def setPipeDecorator(newDecorator: PipeDecorator)
def setExceptionDecorator(newDecorator: CypherException => CypherException)
def build(params: MapValue,
notificationLogger: InternalNotificationLogger,
readOnly: Boolean,
queryProfile: QueryProfile): RuntimeResult
}

trait ExecutionResultBuilderFactory {
def create(): ExecutionResultBuilder
def create(queryContext: QueryContext): ExecutionResultBuilder
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.neo4j.cypher.internal.compiler.v2_3.spi._
import org.neo4j.cypher.internal.frontend.v2_3.SemanticDirection.{BOTH, INCOMING, OUTGOING}
import org.neo4j.cypher.internal.frontend.v2_3.{Bound, EntityNotFoundException, FailedIndexException, SemanticDirection}
import org.neo4j.cypher.internal.javacompat.GraphDatabaseCypherService
import org.neo4j.cypher.internal.runtime.ResourceManager
import org.neo4j.cypher.internal.runtime.interpreted._
import org.neo4j.cypher.internal.spi.CursorIterator
import org.neo4j.graphalgo.impl.path.ShortestPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.neo4j.cypher.internal.compiler.v3_1.spi._
import org.neo4j.cypher.internal.frontend.v3_1.SemanticDirection.{BOTH, INCOMING, OUTGOING}
import org.neo4j.cypher.internal.frontend.v3_1.{Bound, EntityNotFoundException, FailedIndexException, SemanticDirection}
import org.neo4j.cypher.internal.javacompat.GraphDatabaseCypherService
import org.neo4j.cypher.internal.runtime.interpreted.ResourceManager
import org.neo4j.cypher.internal.runtime.ResourceManager
import org.neo4j.cypher.internal.spi.CursorIterator
import org.neo4j.cypher.internal.spi.v3_1.TransactionBoundQueryContext.IndexSearchMonitor
import org.neo4j.graphalgo.impl.path.ShortestPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@ import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.neo4j.cypher.internal.runtime.interpreted.commands.convert.{CommunityExpressionConverter, ExpressionConverters}
import org.neo4j.cypher.internal.runtime.{CloseableResource, QueryContext, QueryTransactionalContext}
import org.neo4j.cypher.internal.runtime.{CloseableResource, NormalMode, QueryContext, QueryTransactionalContext}
import org.neo4j.cypher.internal.runtime.{QueryContext, QueryTransactionalContext, ResourceManager}
import org.neo4j.cypher.internal.v3_5.logical.plans._
import org.neo4j.cypher.result.RuntimeResult
import org.neo4j.internal.kernel.api.Procedures
import org.neo4j.values.storable.LongValue
import org.neo4j.values.virtual.VirtualValues.EMPTY_MAP
import org.opencypher.v9_0.expressions._
import org.opencypher.v9_0.util.DummyPosition
import org.opencypher.v9_0.util.attribution.SequentialIdGen
import org.opencypher.v9_0.util.symbols._
import org.opencypher.v9_0.util.test_helpers.CypherFunSuite

import scala.collection.JavaConverters._
import org.opencypher.v9_0.util.attribution.SequentialIdGen
import org.opencypher.v9_0.util.symbols._
import org.opencypher.v9_0.util.test_helpers.CypherFunSuite

class ProcedureCallExecutionPlanTest extends CypherFunSuite {

Expand Down Expand Up @@ -113,7 +110,7 @@ class ProcedureCallExecutionPlanTest extends CypherFunSuite {

private val pos = DummyPosition(-1)
val ctx = mock[QueryContext]
when(ctx.resources).thenReturn(mock[CloseableResource])
when(ctx.resources).thenReturn(mock[ResourceManager])
var iteratorExhausted = false

val procedureResult = new Answer[Iterator[Array[AnyRef]]] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.neo4j.csv.reader._
import org.neo4j.cypher.internal.runtime.interpreted.pipes.ExternalCSVResource
import org.opencypher.v9_0.util.{LoadExternalResourceException, TaskCloser}
import org.neo4j.cypher.CypherExecutionException
import org.neo4j.cypher.internal.runtime.ResourceManager
import sun.net.www.protocol.http.HttpURLConnection

import scala.collection.mutable.ArrayBuffer
Expand All @@ -54,7 +55,11 @@ object CSVResources {
}
}

class CSVResources(cleaner: TaskCloser) extends ExternalCSVResource {
case class CSVResource(url: URL, resource: AutoCloseable) extends AutoCloseable {
override def close(): Unit = resource.close()
}

class CSVResources(resourceManager: ResourceManager) extends ExternalCSVResource {

def getCsvIterator(url: URL, fieldTerminator: Option[String], legacyCsvQuoteEscaping: Boolean, bufferSize: Int,
headers: Boolean = false): Iterator[Array[String]] = {
Expand All @@ -66,9 +71,7 @@ class CSVResources(cleaner: TaskCloser) extends ExternalCSVResource {
val intDelimiter = delimiter.toInt
val mark = new Mark

cleaner.addTask(_ => {
seeker.close()
})
resourceManager.trace(CSVResource(url, seeker))

new Iterator[Array[String]] {
private def readNextRow: Array[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class DelegatingQueryContext(val inner: QueryContext) extends QueryCont
protected def manyDbHits[A](value: RelationshipSelectionCursor): RelationshipSelectionCursor = value
protected def manyDbHits(count: Int): Int = count

override def resources: CloseableResource = inner.resources
override def resources: ResourceManager = inner.resources

override def transactionalContext: QueryTransactionalContext = inner.transactionalContext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.neo4j.internal.kernel.api.helpers._
import org.neo4j.internal.kernel.api.procs.{UserAggregator, QualifiedName => KernelQualifiedName}
import org.neo4j.io.IOUtils
import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.api._
import org.neo4j.kernel.api.{ResourceManager => NotTheTypeWeWant, _}
import org.neo4j.kernel.api.exceptions.schema.{AlreadyConstrainedException, AlreadyIndexedException}
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory
import org.neo4j.kernel.api.schema.constraints.ConstraintDescriptorFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.net.URL
import org.apache.commons.lang3.SystemUtils
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.neo4j.cypher.internal.runtime.CreateTempFileTestSupport
import org.neo4j.cypher.internal.runtime.{CreateTempFileTestSupport, ResourceManager}
import org.opencypher.v9_0.util.{LoadExternalResourceException, TaskCloser}
import org.opencypher.v9_0.util.test_helpers.CypherFunSuite
import org.neo4j.cypher.internal.runtime.interpreted.CSVResources.DEFAULT_BUFFER_SIZE
Expand All @@ -33,10 +33,10 @@ import org.neo4j.io.fs.FileUtils
class CSVResourcesTest extends CypherFunSuite with CreateTempFileTestSupport {

var resources: CSVResources = _
var cleaner: TaskCloser = _
var cleaner: ResourceManager = _

override def beforeEach() {
cleaner = mock[TaskCloser]
cleaner = mock[ResourceManager]
resources = new CSVResources(cleaner)
}

Expand Down Expand Up @@ -124,7 +124,7 @@ class CSVResourcesTest extends CypherFunSuite with CreateTempFileTestSupport {
result should equal(List.empty)
}

test("should register a task in the cleanupper") {
test("should register a task in the resource manager") {
// given
val url = createCSVTempFileURL {
writer =>
Expand All @@ -137,7 +137,7 @@ class CSVResourcesTest extends CypherFunSuite with CreateTempFileTestSupport {
resources.getCsvIterator(new URL(url), None, legacyCsvQuoteEscaping = false, DEFAULT_BUFFER_SIZE)

// then
verify(cleaner, times(1)).addTask(any(classOf[Boolean => Unit]))
verify(cleaner, times(1)).trace(any(classOf[AutoCloseable]))
}

test("should accept and use a custom field terminator") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ trait QueryContextAdaptation {

override def withActiveRead: QueryContext = ???

override def resources: CloseableResource = ???
override def resources: ResourceManager = ???

override def getOrCreatePropertyKeyId(propertyKey: String): Int = ???

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class LimitPipeTest extends CypherFunSuite {
class DummyPipe(inputIterator: Iterator[ExecutionContext]) extends Pipe {
override protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = ???

override def id(): Id = ???
override def id: Id = ???

override def createResults(state: QueryState): Iterator[ExecutionContext] = inputIterator
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait PipeTestSupport extends CypherTestSupport with MockitoSugar {
protected def internalCreateResults(state: QueryState) = f(state)

// Used by profiling to identify where to report dbhits and rows
override def id(): Id = Id.INVALID_ID
override def id: Id = Id.INVALID_ID
}

def row(values: (String, Any)*) = ExecutionContext.from(values.map(v => (v._1, ValueUtils.of(v._2))): _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ trait QueryContext extends TokenContext with DbAccess {

def withActiveRead: QueryContext

def resources: CloseableResource
def resources: ResourceManager

def nodeOps: Operations[NodeValue]

Expand Down Expand Up @@ -341,4 +341,4 @@ case class IndexedPrimitiveNodeWithProperties(node: Long, values: Array[Value])
val state = Seq(node, values.toSeq)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
}

0 comments on commit 1c0bce0

Please sign in to comment.