Skip to content

Commit

Permalink
Fixed issue with last committed tx id in query planning
Browse files Browse the repository at this point in the history
This issue could result in slaves never re-planing queries after a mode switch
or even directly after the startup. It occurred because function that resolved
last committed transaction id held a reference to a stale TransactionIdStore.
TransactionIdStore is re-initialized after the NeoStoreDataSource restart which
happens after mode switch in HA.

Introduced LastCommittedTxIdProvider that is able to take last committed
transaction id from the given GraphDatabaseService.
  • Loading branch information
Max Sumrall committed Nov 23, 2015
1 parent f023de1 commit dcef31c
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ExecutionPlanBuilder(graph: GraphDatabaseService, statsDivergenceThreshold

def isPeriodicCommit = periodicCommitInfo.isDefined
def plannerUsed = planner
def isStale(lastTxId: () => Long, statistics: GraphStatistics) = fingerprint.isStale(lastTxId, statistics)
def isStale(lastCommittedTxId: () => Long, statistics: GraphStatistics) = fingerprint.isStale(lastCommittedTxId, statistics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ case class PlanFingerprint(creationTimeMillis: Long, txId: Long, snapshot: Graph
case class PlanFingerprintReference(clock: Clock, ttl: Long, statsDivergenceThreshold : Double,
var fingerprint: Option[PlanFingerprint])
{
def isStale(lastTxId: () => Long, statistics: GraphStatistics): Boolean = {
def isStale(lastCommittedTxId: () => Long, statistics: GraphStatistics): Boolean = {
fingerprint.fold(false) { f =>
lazy val currentTimeMillis = clock.currentTimeMillis()
lazy val currentTxId = lastTxId()
lazy val currentTxId = lastCommittedTxId()

check(f.creationTimeMillis + ttl <= currentTimeMillis, {}) &&
check(currentTxId != f.txId, { fingerprint = Some(f.copy(creationTimeMillis = currentTimeMillis)) }) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ package org.neo4j.cypher

import java.util.{Map => JavaMap}

import org.neo4j.cypher.internal.compiler.v2_2._
import org.neo4j.cypher.internal.compiler.v2_2.helpers.LRUCache
import org.neo4j.cypher.internal.compiler.v2_2.parser.ParserMonitor
import org.neo4j.cypher.internal.compiler.v2_2.prettifier.Prettifier
import org.neo4j.cypher.internal.compiler.v2_2._
import org.neo4j.cypher.internal.{CypherCompiler, _}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.kernel.impl.query.{QueryEngineProvider, QueryExecutionMonitor, QuerySession}
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore
import org.neo4j.kernel.impl.util.StringLogger
import org.neo4j.kernel.{GraphDatabaseAPI, InternalAbstractGraphDatabase, api, monitoring}

Expand All @@ -47,8 +46,7 @@ class ExecutionEngine(graph: GraphDatabaseService, logger: StringLogger = String
protected val isServer = false
protected val graphAPI = graph.asInstanceOf[GraphDatabaseAPI]
protected val kernel = graphAPI.getDependencyResolver.resolveDependency(classOf[org.neo4j.kernel.api.KernelAPI])
private val lastTxId: () => Long =
graphAPI.getDependencyResolver.resolveDependency( classOf[TransactionIdStore]).getLastCommittedTransactionId
private val lastCommittedTxId = LastCommittedTxIdProvider(graphAPI)
protected val kernelMonitors: monitoring.Monitors = graphAPI.getDependencyResolver.resolveDependency(classOf[org.neo4j.kernel.monitoring.Monitors])
protected val compiler = createCompiler(logger)

Expand Down Expand Up @@ -154,7 +152,7 @@ class ExecutionEngine(graph: GraphDatabaseService, logger: StringLogger = String
parsedQuery.plan(kernelStatement)
})
}.flatMap { case (candidatePlan, params) =>
if (!touched && candidatePlan.isStale(lastTxId, kernelStatement)) {
if (!touched && candidatePlan.isStale(lastCommittedTxId, kernelStatement)) {
cacheAccessor.remove(cache)(cacheKey)
None
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ trait ExecutionPlan {

def isPeriodicCommit: Boolean

def isStale(lastTxId: () => Long, statement: Statement): Boolean
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal

import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore

case class LastCommittedTxIdProvider(db: GraphDatabaseService) extends (() => Long) {

override def apply(): Long = {
val resolver = db.asInstanceOf[GraphDatabaseAPI].getDependencyResolver
val txIdStore = resolver.resolveDependency(classOf[TransactionIdStore])
txIdStore.getLastCommittedTransactionId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.neo4j.cypher.CypherVersion
import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v1_9.executionplan.{ExecutionPlan => ExecutionPlan_v1_9}
import org.neo4j.cypher.internal.compiler.v1_9.{CypherCompiler => CypherCompiler1_9}
import org.neo4j.cypher.internal.compiler.v2_2.{ProfileMode, NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_2.{ExecutionMode, NormalMode, ProfileMode}
import org.neo4j.cypher.internal.spi.v1_9.{GDSBackedQueryContext => QueryContext_v1_9}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -71,6 +71,6 @@ case class CompatibilityFor1_9(graph: GraphDatabaseService, queryCacheSize: Int,

def isPeriodicCommit = false

def isStale(lastTxId: () => Long, statement: Statement): Boolean = false
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v2_0.CypherCompiler
import org.neo4j.cypher.internal.compiler.v2_0.executionplan.{ExecutionPlan => ExecutionPlan_v2_0}
import org.neo4j.cypher.internal.compiler.v2_0.spi.{ExceptionTranslatingQueryContext => ExceptionTranslatingQueryContext_v2_0}
import org.neo4j.cypher.internal.compiler.v2_2.{ProfileMode, NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_2.{ExecutionMode, NormalMode, ProfileMode}
import org.neo4j.cypher.internal.spi.v2_0.{TransactionBoundPlanContext => PlanContext_v2_0, TransactionBoundQueryContext => QueryContext_v2_0}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -74,6 +74,6 @@ case class CompatibilityFor2_0(graph: GraphDatabaseService, queryCacheSize: Int,

def isPeriodicCommit = false

def isStale(lastTxId: () => Long, statement: Statement): Boolean = false
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compiler.v2_1.CypherCompilerFactory
import org.neo4j.cypher.internal.compiler.v2_1.executionplan.{ExecutionPlan => ExecutionPlan_v2_1}
import org.neo4j.cypher.internal.compiler.v2_1.spi.{ExceptionTranslatingQueryContext => ExceptionTranslatingQueryContext_v2_1}
import org.neo4j.cypher.internal.compiler.v2_2.{ProfileMode, NormalMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_2.{ExecutionMode, NormalMode, ProfileMode}
import org.neo4j.cypher.internal.spi.v2_1.{TransactionBoundPlanContext => PlanContext_v2_1, TransactionBoundQueryContext => QueryContext_v2_1}
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -77,6 +77,6 @@ case class CompatibilityFor2_1(graph: GraphDatabaseService, queryCacheSize: Int,

def isPeriodicCommit = inner.isPeriodicCommit

def isStale(lastTxId: () => Long, statement: Statement): Boolean = false
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.neo4j.cypher.internal.compiler.v2_2.tracing.rewriters.RewriterStepSeq
import org.neo4j.cypher.internal.compiler.v2_2.{CypherException => CypherException_v2_2, _}
import org.neo4j.cypher.internal.spi.v2_2.{TransactionBoundGraphStatistics, TransactionBoundPlanContext, TransactionBoundQueryContext}
import org.neo4j.cypher.javacompat.ProfilerStatistics
import org.neo4j.cypher.{ArithmeticException, CypherTypeException, EntityNotFoundException, FailedIndexException, IncomparableValuesException, IndexHintException, InternalException, InvalidArgumentException, InvalidSemanticsException, LabelScanHintException, LoadCsvStatusWrapCypherException, LoadExternalResourceException, MergeConstraintConflictException, NodeStillHasRelationshipsException, ParameterNotFoundException, ParameterWrongTypeException, PatternException, PeriodicCommitInOpenTransactionException, ProfilerStatisticsNotReadyException, SyntaxException, UniquePathNotUniqueException, UnknownLabelException, HintException, _}
import org.neo4j.cypher.{ArithmeticException, CypherTypeException, EntityNotFoundException, FailedIndexException, HintException, IncomparableValuesException, IndexHintException, InternalException, InvalidArgumentException, InvalidSemanticsException, LabelScanHintException, LoadCsvStatusWrapCypherException, LoadExternalResourceException, MergeConstraintConflictException, NodeStillHasRelationshipsException, ParameterNotFoundException, ParameterWrongTypeException, PatternException, PeriodicCommitInOpenTransactionException, ProfilerStatisticsNotReadyException, SyntaxException, UniquePathNotUniqueException, UnknownLabelException, _}
import org.neo4j.graphdb.{GraphDatabaseService, QueryExecutionType, ResourceIterator}
import org.neo4j.helpers.{Assertion, Clock}
import org.neo4j.kernel.GraphDatabaseAPI
Expand Down Expand Up @@ -181,8 +181,8 @@ trait CompatibilityFor2_2 {

def isPeriodicCommit = inner.isPeriodicCommit

def isStale(lastTxId: () => Long, statement: Statement) =
inner.isStale(lastTxId, TransactionBoundGraphStatistics(statement))
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement) =
inner.isStale(lastCommittedTxId, TransactionBoundGraphStatistics(statement))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
package org.neo4j.cypher.internal.spi.v2_2

import org.neo4j.cypher.MissingIndexException
import org.neo4j.cypher.internal.LastCommittedTxIdProvider
import org.neo4j.cypher.internal.compiler.v2_2.spi._
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.api.Statement
import org.neo4j.kernel.api.constraints.UniquenessConstraint
import org.neo4j.kernel.api.exceptions.KernelException
import org.neo4j.kernel.api.exceptions.schema.SchemaRuleNotFoundException
import org.neo4j.kernel.api.index.{IndexDescriptor, InternalIndexState}
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore

class TransactionBoundPlanContext(someStatement: Statement, val gdb: GraphDatabaseService)
extends TransactionBoundTokenContext(someStatement) with PlanContext {
Expand Down Expand Up @@ -89,8 +88,5 @@ class TransactionBoundPlanContext(someStatement: Statement, val gdb: GraphDataba
val statistics: GraphStatistics =
InstrumentedGraphStatistics(TransactionBoundGraphStatistics(statement), MutableGraphStatisticsSnapshot())

val txIdProvider: () => Long = gdb.asInstanceOf[GraphDatabaseAPI]
.getDependencyResolver
.resolveDependency(classOf[TransactionIdStore])
.getLastCommittedTransactionId
val txIdProvider = LastCommittedTxIdProvider(gdb)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.neo4j.cypher.internal.compiler.v2_2.planner.logical.plans.LogicalPlan
import org.neo4j.graphdb._
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.io.fs.FileUtils
import org.neo4j.kernel.TopLevelTransaction
import org.neo4j.test.{TestGraphDatabaseFactory, ImpermanentGraphDatabase}
import org.neo4j.kernel.{NeoStoreDataSource, TopLevelTransaction}
import org.neo4j.test.{ImpermanentGraphDatabase, TestGraphDatabaseFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -1076,6 +1076,26 @@ order by a.COL1""")
))
}

test("replanning should happen after data source restart") {
val planningListener = PlanningListener()
kernelMonitors.addMonitorListener(planningListener)

val result1 = eengine.execute("match (n) return n").toList
result1 shouldBe empty

val ds = graph.getDependencyResolver.resolveDependency(classOf[NeoStoreDataSource])
ds.stop()
ds.start()

val result2 = eengine.execute("match (n) return n").toList
result2 shouldBe empty

planningListener.planRequests.toSeq should equal(Seq(
s"match (n) return n",
s"match (n) return n"
))
}

private def createReadOnlyEngine(): ExecutionEngine = {
FileUtils.deleteRecursively(new File("target/readonly"))
val old = new TestGraphDatabaseFactory().newEmbeddedDatabase("target/readonly")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal

import org.neo4j.cypher.internal.commons.CypherFunSuite
import org.neo4j.kernel.NeoStoreDataSource
import org.neo4j.test.ImpermanentGraphDatabase
import org.scalatest.BeforeAndAfterAll

class LastCommittedTxIdProviderTest extends CypherFunSuite with BeforeAndAfterAll {

var db: ImpermanentGraphDatabase = null
var lastCommittedTxIdProvider: LastCommittedTxIdProvider = null

override protected def beforeAll(): Unit = {
db = new ImpermanentGraphDatabase()
lastCommittedTxIdProvider = LastCommittedTxIdProvider(db)
}

override protected def afterAll(): Unit = db.shutdown()

test("should return correct last committed tx id") {
val startingTxId = lastCommittedTxIdProvider()

(1 to 42).foreach(_ => createNode())

val endingTxId = lastCommittedTxIdProvider()
endingTxId shouldBe (startingTxId + 42)
}

test("should return correct last committed tx id after datasource restart") {
val startingTxId = lastCommittedTxIdProvider()

restartDataSource()
(1 to 42).foreach(_ => createNode())

val endingTxId = lastCommittedTxIdProvider()
endingTxId shouldBe (startingTxId + 42)
}

private def createNode(): Unit = {
val tx = db.beginTx()
try {
db.createNode()
tx.success()
}
finally {
tx.close()
}
}

private def restartDataSource(): Unit = {
val ds = db.getDependencyResolver.resolveDependency(classOf[NeoStoreDataSource])
ds.stop()
ds.start()
}
}

0 comments on commit dcef31c

Please sign in to comment.