Skip to content

Commit

Permalink
Merge pull request #5935 from maxsumrall/2.2-slave-replanning
Browse files Browse the repository at this point in the history
Fixed issue with last committed tx id in query planning
  • Loading branch information
davidegrohmann committed Nov 24, 2015
2 parents 2b80c27 + 7398b85 commit 2565b40
Show file tree
Hide file tree
Showing 16 changed files with 477 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ExecutionPlanBuilder(graph: GraphDatabaseService, statsDivergenceThreshold

def isPeriodicCommit = periodicCommitInfo.isDefined
def plannerUsed = planner
def isStale(lastTxId: () => Long, statistics: GraphStatistics) = fingerprint.isStale(lastTxId, statistics)
def isStale(lastCommittedTxId: () => Long, statistics: GraphStatistics) = fingerprint.isStale(lastCommittedTxId, statistics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ case class PlanFingerprint(creationTimeMillis: Long, txId: Long, snapshot: Graph
case class PlanFingerprintReference(clock: Clock, ttl: Long, statsDivergenceThreshold : Double,
var fingerprint: Option[PlanFingerprint])
{
def isStale(lastTxId: () => Long, statistics: GraphStatistics): Boolean = {
def isStale(lastCommittedTxId: () => Long, statistics: GraphStatistics): Boolean = {
fingerprint.fold(false) { f =>
lazy val currentTimeMillis = clock.currentTimeMillis()
lazy val currentTxId = lastTxId()
lazy val currentTxId = lastCommittedTxId()

check(f.creationTimeMillis + ttl <= currentTimeMillis, {}) &&
check(currentTxId != f.txId, { fingerprint = Some(f.copy(creationTimeMillis = currentTimeMillis)) }) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ package org.neo4j.cypher

import java.util.{Map => JavaMap}

import org.neo4j.cypher.internal.compiler.v2_2._
import org.neo4j.cypher.internal.compiler.v2_2.helpers.LRUCache
import org.neo4j.cypher.internal.compiler.v2_2.parser.ParserMonitor
import org.neo4j.cypher.internal.compiler.v2_2.prettifier.Prettifier
import org.neo4j.cypher.internal.compiler.v2_2._
import org.neo4j.cypher.internal.{CypherCompiler, _}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.kernel.impl.query.{QueryEngineProvider, QueryExecutionMonitor, QuerySession}
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore
import org.neo4j.kernel.impl.util.StringLogger
import org.neo4j.kernel.{GraphDatabaseAPI, InternalAbstractGraphDatabase, api, monitoring}

Expand All @@ -47,8 +46,7 @@ class ExecutionEngine(graph: GraphDatabaseService, logger: StringLogger = String
protected val isServer = false
protected val graphAPI = graph.asInstanceOf[GraphDatabaseAPI]
protected val kernel = graphAPI.getDependencyResolver.resolveDependency(classOf[org.neo4j.kernel.api.KernelAPI])
private val lastTxId: () => Long =
graphAPI.getDependencyResolver.resolveDependency( classOf[TransactionIdStore]).getLastCommittedTransactionId
private val lastCommittedTxId = LastCommittedTxIdProvider(graphAPI)
protected val kernelMonitors: monitoring.Monitors = graphAPI.getDependencyResolver.resolveDependency(classOf[org.neo4j.kernel.monitoring.Monitors])
protected val compiler = createCompiler(logger)

Expand Down Expand Up @@ -154,7 +152,7 @@ class ExecutionEngine(graph: GraphDatabaseService, logger: StringLogger = String
parsedQuery.plan(kernelStatement)
})
}.flatMap { case (candidatePlan, params) =>
if (!touched && candidatePlan.isStale(lastTxId, kernelStatement)) {
if (!touched && candidatePlan.isStale(lastCommittedTxId, kernelStatement)) {
cacheAccessor.remove(cache)(cacheKey)
None
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ trait ExecutionPlan {

def isPeriodicCommit: Boolean

def isStale(lastTxId: () => Long, statement: Statement): Boolean
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal

import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore

case class LastCommittedTxIdProvider(db: GraphDatabaseService) extends (() => Long) {

private val resolver = db.asInstanceOf[GraphDatabaseAPI].getDependencyResolver

override def apply(): Long = {
val txIdStore = resolver.resolveDependency(classOf[TransactionIdStore])
txIdStore.getLastCommittedTransactionId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.neo4j.cypher.CypherVersion
import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v1_9.executionplan.{ExecutionPlan => ExecutionPlan_v1_9}
import org.neo4j.cypher.internal.compiler.v1_9.{CypherCompiler => CypherCompiler1_9}
import org.neo4j.cypher.internal.compiler.v2_2.{ProfileMode, NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_2.{ExecutionMode, NormalMode, ProfileMode}
import org.neo4j.cypher.internal.spi.v1_9.{GDSBackedQueryContext => QueryContext_v1_9}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -71,6 +71,6 @@ case class CompatibilityFor1_9(graph: GraphDatabaseService, queryCacheSize: Int,

def isPeriodicCommit = false

def isStale(lastTxId: () => Long, statement: Statement): Boolean = false
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v2_0.CypherCompiler
import org.neo4j.cypher.internal.compiler.v2_0.executionplan.{ExecutionPlan => ExecutionPlan_v2_0}
import org.neo4j.cypher.internal.compiler.v2_0.spi.{ExceptionTranslatingQueryContext => ExceptionTranslatingQueryContext_v2_0}
import org.neo4j.cypher.internal.compiler.v2_2.{ProfileMode, NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_2.{ExecutionMode, NormalMode, ProfileMode}
import org.neo4j.cypher.internal.spi.v2_0.{TransactionBoundPlanContext => PlanContext_v2_0, TransactionBoundQueryContext => QueryContext_v2_0}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -74,6 +74,6 @@ case class CompatibilityFor2_0(graph: GraphDatabaseService, queryCacheSize: Int,

def isPeriodicCommit = false

def isStale(lastTxId: () => Long, statement: Statement): Boolean = false
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v2_1.CypherCompilerFactory
import org.neo4j.cypher.internal.compiler.v2_1.executionplan.{ExecutionPlan => ExecutionPlan_v2_1}
import org.neo4j.cypher.internal.compiler.v2_1.spi.{ExceptionTranslatingQueryContext => ExceptionTranslatingQueryContext_v2_1}
import org.neo4j.cypher.internal.compiler.v2_2.{ProfileMode, NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_2.{ExecutionMode, NormalMode, ProfileMode}
import org.neo4j.cypher.internal.spi.v2_1.{TransactionBoundPlanContext => PlanContext_v2_1, TransactionBoundQueryContext => QueryContext_v2_1}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -77,6 +77,6 @@ case class CompatibilityFor2_1(graph: GraphDatabaseService, queryCacheSize: Int,

def isPeriodicCommit = inner.isPeriodicCommit

def isStale(lastTxId: () => Long, statement: Statement): Boolean = false
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.neo4j.cypher.internal.compiler.v2_2.tracing.rewriters.RewriterStepSeq
import org.neo4j.cypher.internal.compiler.v2_2.{CypherException => CypherException_v2_2, _}
import org.neo4j.cypher.internal.spi.v2_2.{TransactionBoundGraphStatistics, TransactionBoundPlanContext, TransactionBoundQueryContext}
import org.neo4j.cypher.javacompat.ProfilerStatistics
import org.neo4j.cypher.{ArithmeticException, CypherTypeException, EntityNotFoundException, FailedIndexException, IncomparableValuesException, IndexHintException, InternalException, InvalidArgumentException, InvalidSemanticsException, LabelScanHintException, LoadCsvStatusWrapCypherException, LoadExternalResourceException, MergeConstraintConflictException, NodeStillHasRelationshipsException, ParameterNotFoundException, ParameterWrongTypeException, PatternException, PeriodicCommitInOpenTransactionException, ProfilerStatisticsNotReadyException, SyntaxException, UniquePathNotUniqueException, UnknownLabelException, HintException, _}
import org.neo4j.cypher.{ArithmeticException, CypherTypeException, EntityNotFoundException, FailedIndexException, HintException, IncomparableValuesException, IndexHintException, InternalException, InvalidArgumentException, InvalidSemanticsException, LabelScanHintException, LoadCsvStatusWrapCypherException, LoadExternalResourceException, MergeConstraintConflictException, NodeStillHasRelationshipsException, ParameterNotFoundException, ParameterWrongTypeException, PatternException, PeriodicCommitInOpenTransactionException, ProfilerStatisticsNotReadyException, SyntaxException, UniquePathNotUniqueException, UnknownLabelException, _}
import org.neo4j.graphdb.{GraphDatabaseService, QueryExecutionType, ResourceIterator}
import org.neo4j.helpers.{Assertion, Clock}
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -181,8 +181,8 @@ trait CompatibilityFor2_2 {

def isPeriodicCommit = inner.isPeriodicCommit

def isStale(lastTxId: () => Long, statement: Statement) =
inner.isStale(lastTxId, TransactionBoundGraphStatistics(statement))
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement) =
inner.isStale(lastCommittedTxId, TransactionBoundGraphStatistics(statement))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
package org.neo4j.cypher.internal.spi.v2_2

import org.neo4j.cypher.MissingIndexException
import org.neo4j.cypher.internal.LastCommittedTxIdProvider
import org.neo4j.cypher.internal.compiler.v2_2.spi._
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.api.Statement
import org.neo4j.kernel.api.constraints.UniquenessConstraint
import org.neo4j.kernel.api.exceptions.KernelException
import org.neo4j.kernel.api.exceptions.schema.SchemaRuleNotFoundException
import org.neo4j.kernel.api.index.{IndexDescriptor, InternalIndexState}
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore

class TransactionBoundPlanContext(someStatement: Statement, val gdb: GraphDatabaseService)
extends TransactionBoundTokenContext(someStatement) with PlanContext {
Expand Down Expand Up @@ -89,8 +88,5 @@ class TransactionBoundPlanContext(someStatement: Statement, val gdb: GraphDataba
val statistics: GraphStatistics =
InstrumentedGraphStatistics(TransactionBoundGraphStatistics(statement), MutableGraphStatisticsSnapshot())

val txIdProvider: () => Long = gdb.asInstanceOf[GraphDatabaseAPI]
.getDependencyResolver
.resolveDependency(classOf[TransactionIdStore])
.getLastCommittedTransactionId
val txIdProvider = LastCommittedTxIdProvider(gdb)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.neo4j.cypher.internal.compiler.v2_2.planner.logical.plans.LogicalPlan
import org.neo4j.graphdb._
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.io.fs.FileUtils
import org.neo4j.kernel.TopLevelTransaction
import org.neo4j.test.{TestGraphDatabaseFactory, ImpermanentGraphDatabase}
import org.neo4j.kernel.{NeoStoreDataSource, TopLevelTransaction}
import org.neo4j.test.{ImpermanentGraphDatabase, TestGraphDatabaseFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -1076,6 +1076,26 @@ order by a.COL1""")
))
}

test("replanning should happen after data source restart") {
val planningListener = PlanningListener()
kernelMonitors.addMonitorListener(planningListener)

val result1 = eengine.execute("match (n) return n").toList
result1 shouldBe empty

val ds = graph.getDependencyResolver.resolveDependency(classOf[NeoStoreDataSource])
ds.stop()
ds.start()

val result2 = eengine.execute("match (n) return n").toList
result2 shouldBe empty

planningListener.planRequests.toSeq should equal(Seq(
s"match (n) return n",
s"match (n) return n"
))
}

private def createReadOnlyEngine(): ExecutionEngine = {
FileUtils.deleteRecursively(new File("target/readonly"))
val old = new TestGraphDatabaseFactory().newEmbeddedDatabase("target/readonly")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal

import org.neo4j.cypher.internal.commons.CypherFunSuite
import org.neo4j.kernel.NeoStoreDataSource
import org.neo4j.test.ImpermanentGraphDatabase
import org.scalatest.BeforeAndAfterAll

class LastCommittedTxIdProviderTest extends CypherFunSuite with BeforeAndAfterAll {

var db: ImpermanentGraphDatabase = null
var lastCommittedTxIdProvider: LastCommittedTxIdProvider = null

override protected def beforeAll(): Unit = {
db = new ImpermanentGraphDatabase()
lastCommittedTxIdProvider = LastCommittedTxIdProvider(db)
}

override protected def afterAll(): Unit = db.shutdown()

test("should return correct last committed tx id") {
val startingTxId = lastCommittedTxIdProvider()

(1 to 42).foreach(_ => createNode())

val endingTxId = lastCommittedTxIdProvider()
endingTxId shouldBe (startingTxId + 42)
}

test("should return correct last committed tx id after datasource restart") {
val startingTxId = lastCommittedTxIdProvider()

restartDataSource()
(1 to 42).foreach(_ => createNode())

val endingTxId = lastCommittedTxIdProvider()
endingTxId shouldBe (startingTxId + 42)
}

private def createNode(): Unit = {
val tx = db.beginTx()
try {
db.createNode()
tx.success()
}
finally {
tx.close()
}
}

private def restartDataSource(): Unit = {
val ds = db.getDependencyResolver.resolveDependency(classOf[NeoStoreDataSource])
ds.stop()
ds.start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.neo4j.kernel.impl.util.StringLogger;

import static java.nio.ByteBuffer.wrap;

import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.UTF8.encode;
import static org.neo4j.io.fs.FileUtils.windowsSafeIOOperation;
Expand Down Expand Up @@ -123,7 +122,7 @@ public CommonAbstractStore(
{
try
{
storeFile.close();
closeStoreFile();
}
catch ( IOException failureToClose )
{
Expand Down Expand Up @@ -712,6 +711,19 @@ public void flush()
}
}

/**
* Checks if this store is closed and throws exception if it is.
*
* @throws IllegalStateException if the store is closed
*/
protected void assertNotClosed()
{
if ( storeFile == null )
{
throw new IllegalStateException( this + " for file '" + storageFileName + "' is closed" );
}
}

/**
* Closes this store. This will cause all buffers and channels to be closed.
* Requesting an operation from after this method has been invoked is
Expand All @@ -731,7 +743,7 @@ public void close()
closeStorage();
try
{
storeFile.close();
closeStoreFile();
}
catch ( IOException e )
{
Expand Down Expand Up @@ -776,7 +788,19 @@ public void perform() throws IOException
}
}

protected void releaseFileLockAndCloseFileChannel()
private void closeStoreFile() throws IOException
{
try
{
storeFile.close();
}
finally
{
storeFile = null;
}
}

private void releaseFileLockAndCloseFileChannel()
{
try
{
Expand Down

0 comments on commit 2565b40

Please sign in to comment.