Skip to content

Commit

Permalink
Fix bug in detach delete, which could cause deadlocks
Browse files Browse the repository at this point in the history
This bug was caused by TwoPhaseNodeForRelationshipLocking not
guaranteeing to acquire locks in ascending node id order. This was
tested, but the 3 tested nodes coincidentally were provided in the
correct order by that iterator, causing the test to pass.
  • Loading branch information
fickludd committed Sep 29, 2017
1 parent efa67a6 commit 842d61d
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 98 deletions.
Expand Up @@ -19,10 +19,9 @@
*/
package org.neo4j.cypher

import java.io.{PrintStream, PrintWriter, StringWriter}
import java.io.{PrintWriter, StringWriter}

import org.neo4j.graphdb.{NotFoundException, TransactionFailureException}
import org.neo4j.io.NullOutputStream

import scala.language.reflectiveCalls

Expand All @@ -35,7 +34,7 @@ class DeleteConcurrencyIT extends ExecutionEngineFunSuite {

val threadNum = 2
val threads: List[MyThread] = (0 until threadNum).map { ignored =>
new MyThread(() => {
new MyThread(1, () => {
execute(s"MATCH (root) WHERE ID(root) = 0 DELETE root").toList
})
}.toList
Expand All @@ -59,7 +58,7 @@ class DeleteConcurrencyIT extends ExecutionEngineFunSuite {

val threadNum = 2
val threads: List[MyThread] = (0 until threadNum).map { ignored =>
new MyThread(() => {
new MyThread(1, () => {
execute(s"MATCH ()-[r:FRIEND]->() WHERE ID(r) = 0 DELETE r").toList
})
}.toList
Expand All @@ -82,11 +81,11 @@ class DeleteConcurrencyIT extends ExecutionEngineFunSuite {
}
val concurrency = 30
val threads: List[MyThread] = (0 until concurrency).map { ignored =>
new MyThread(() => {
new MyThread(1, () => {
execute(s"MATCH ()-[r]->() WITH r DELETE r").toList
})
}.toList ++ (0 until concurrency).map { ignored =>
new MyThread(() => {
new MyThread(1, () => {
execute(s"MATCH (p1), (p2) WHERE id(p1) < id(p2) CREATE (p2)-[:T]->(p1)").toList
})
}.toList
Expand Down Expand Up @@ -116,7 +115,7 @@ class DeleteConcurrencyIT extends ExecutionEngineFunSuite {
}

val threads: List[MyThread] = ids.map { id =>
new MyThread(() => {
new MyThread(1, () => {
execute(s"MATCH (root)-[:name]->(b) WHERE ID(root) = $id DETACH DELETE b").toList
})
}
Expand All @@ -134,76 +133,78 @@ class DeleteConcurrencyIT extends ExecutionEngineFunSuite {
}

test("detach delete should be atomic") {
val originalErr = System.err
System.setErr(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM)) // let's not spam!
val NUM_NODES = 13
val NUM_EXECUTIONS = 10

try {
val nodes = 10
val ids = {
val ids = graph.inTx {
(0 until nodes).map(ignored => execute("CREATE (n:person) RETURN ID(n) as id").columnAs[Long]("id").next()).toList
(0 until NUM_NODES).map(ignored => execute("CREATE (n:person) RETURN ID(n) as id").columnAs[Long]("id").next()).toList
}
new scala.util.Random(41).shuffle(ids)
}

graph.inTx {
ids.foreach { id =>
execute(s"MATCH (a) WHERE ID(a) = $id MERGE (b:person_name {val:'Bob Smith'}) CREATE (a)-[r:name]->(b)").toList
}
graph.inTx {
ids.foreach { id =>
execute(s"MATCH (a) WHERE ID(a) = $id MERGE (b:person_name {val:'Bob Smith'}) CREATE (a)-[r:name]->(b)").toList
}
}

val threads: List[MyThread] = ids.map { id =>
new MyThread(() => {
execute(s"MATCH (root)-[:name]->(b) WHERE ID(root) = $id DETACH DELETE b").toList
})
} ++ ids.map { id =>
new MyThread(() => {
try {
execute(s"MATCH (root)-[:name]->(b) WHERE ID(root) = $id CREATE (root)-[:name]->(b)").toList
} catch {
case _: NotFoundException => // ignore if we cannot create the relationship if b has been deleted
case _: CypherExecutionException => // ignore if we cannot create the relationship if b has been deleted
}
}, ignoreException = {
// let's ignore the commit failures if they are caused by the above exceptions
case ex: TransactionFailureException =>
val cause: Throwable = ex.getCause
cause.isInstanceOf[org.neo4j.kernel.api.exceptions.TransactionFailureException] &&
cause.getMessage == "Transaction rolled back even if marked as successful"
case ex: CypherExecutionException =>
ex.getCause.isInstanceOf[org.neo4j.kernel.api.exceptions.EntityNotFoundException] &&
ex.getCause.getMessage.startsWith("Unable to load NODE with id")
case _ => false
})
}
val threads: List[MyThread] = ids.map { id =>
new MyThread(NUM_EXECUTIONS, () => {
execute(s"MATCH (root)-[:name]->(b) WHERE ID(root) = $id DETACH DELETE b").toList
})
} ++ ids.map { id =>
new MyThread(NUM_EXECUTIONS, () => {
execute(s"MATCH (root)-[:name]->(b) WHERE ID(root) = $id CREATE (root)-[:name]->(b)").toList
}, ignoreRollbackAndNodeNotFound)
}

threads.foreach(_.start())
threads.foreach(_.join())
threads.foreach(_.start())
threads.foreach(_.join())

val errors = threads.collect {
case t if t.exception != null => t.exception
}
val errors = threads.collect {
case t if t.exception != null => t.exception
}

withClue(prettyPrintErrors(errors)) {
errors shouldBe empty
}
} finally System.setErr(originalErr)
withClue(prettyPrintErrors(errors)) {
errors shouldBe empty
}
}

val ignoreRollbackAndNodeNotFound: Throwable => Boolean = {
// let's ignore the commit failures if they are caused by the above exceptions
case ex: TransactionFailureException =>
val cause: Throwable = ex.getCause
cause.isInstanceOf[org.neo4j.kernel.api.exceptions.TransactionFailureException] &&
cause.getMessage == "Transaction rolled back even if marked as successful"
case ex: CypherExecutionException =>
ex.getCause.isInstanceOf[org.neo4j.kernel.api.exceptions.EntityNotFoundException] &&
ex.getCause.getMessage.startsWith("Unable to load NODE with id")
case ex: NotFoundException => true
case _ => false
}

private def prettyPrintErrors(errors: Seq[Throwable]): String = {
val stringWriter = new StringWriter()
val writer = new PrintWriter(stringWriter)
errors.foreach { e => e.printStackTrace(writer); writer.println() }
stringWriter.toString
}

private class MyThread(f: () => Unit, ignoreException: (Throwable) => Boolean = _ => false) extends Thread {
private var ex: Throwable = null
private class MyThread(numExecutions:Int, f: () => Unit, ignoreException: (Throwable) => Boolean = _ => false) extends Thread {
private var ex: Throwable = _

def exception: Throwable = ex

override def run() {
try {
graph.inTx { f() }
} catch {
case ex: Throwable if !ignoreException(ex) => this.ex = ex
for ( i <- 0 until numExecutions ) {
try {
graph.inTx { f() }
} catch {
case ex: Throwable =>
if (!ignoreException(ex))
this.ex = ex
}
}
}
}
Expand Down
Expand Up @@ -19,30 +19,35 @@
*/
package org.neo4j.kernel.impl.api;

import java.util.Arrays;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveArrays;
import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.cursor.Cursor;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.impl.api.operations.EntityReadOperations;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.RelationshipItem;

import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_RELATIONSHIP;

class TwoPhaseNodeForRelationshipLocking
{
private final PrimitiveLongSet nodeIds = Primitive.longSet();
private final EntityReadOperations entityReadOperations;
private final EntityReadOperations ops;
private final ThrowingConsumer<Long,KernelException> relIdAction;

private long firstRelId;
private long[] sortedNodeIds;

TwoPhaseNodeForRelationshipLocking( EntityReadOperations entityReadOperations,
ThrowingConsumer<Long,KernelException> relIdAction )
{
this.entityReadOperations = entityReadOperations;
this.ops = entityReadOperations;
this.relIdAction = relIdAction;
}

Expand All @@ -51,24 +56,18 @@ void lockAllNodesAndConsumeRelationships( long nodeId, final KernelStatement sta
boolean retry;
do
{
nodeIds.add( nodeId );
retry = false;
firstRelId = -1;
firstRelId = NO_SUCH_RELATIONSHIP;

// lock all the nodes involved by following the node id ordering
try ( Cursor<NodeItem> node = entityReadOperations.nodeCursorById( state, nodeId ) )
{
entityReadOperations.nodeGetRelationships( state, node.get(), Direction.BOTH )
.forAll( this::collectNodeId );
}
collectAndSortNodeIds( nodeId, state );

lockAllNodes( state );
lockAllNodes( state, sortedNodeIds );

// perform the action on each relationship, we will retry if the the relationship iterator contains new relationships
try ( Cursor<NodeItem> node = entityReadOperations.nodeCursorById( state, nodeId ) )
try ( Cursor<NodeItem> node = ops.nodeCursorById( state, nodeId ) )
{
try ( Cursor<RelationshipItem> relationships = entityReadOperations
.nodeGetRelationships( state, node.get(), Direction.BOTH ) )
try ( Cursor<RelationshipItem> relationships = ops.nodeGetRelationships( state, node.get(), Direction.BOTH ) )
{
boolean first = true;
while ( relationships.next() && !retry )
Expand All @@ -82,24 +81,48 @@ void lockAllNodesAndConsumeRelationships( long nodeId, final KernelStatement sta
while ( retry );
}

private void lockAllNodes( KernelStatement state )
private void collectAndSortNodeIds( long nodeId, KernelStatement state ) throws EntityNotFoundException
{
PrimitiveLongSet nodeIdSet = Primitive.longSet();
nodeIdSet.add( nodeId );

try ( Cursor<NodeItem> node = ops.nodeCursorById( state, nodeId ) )
{
try ( Cursor<RelationshipItem> rels = ops.nodeGetRelationships( state, node.get(), Direction.BOTH ) )
{
while ( rels.next() )
{
RelationshipItem rel = rels.get();
if ( firstRelId == NO_SUCH_RELATIONSHIP )
{
firstRelId = rel.id();
}

nodeIdSet.add( rel.startNode() );
nodeIdSet.add( rel.endNode() );
}
}
}

long[] nodeIds = PrimitiveArrays.of( nodeIdSet );
Arrays.sort( nodeIds );
this.sortedNodeIds = nodeIds;
}

private void lockAllNodes( KernelStatement state, long[] nodeIds )
{
PrimitiveLongIterator nodeIdIterator = nodeIds.iterator();
while ( nodeIdIterator.hasNext() )
for ( long nodeId : nodeIds )
{
state.locks().optimistic()
.acquireExclusive( state.lockTracer(), ResourceTypes.NODE, nodeIdIterator.next() );
state.locks().optimistic().acquireExclusive( state.lockTracer(), ResourceTypes.NODE, nodeId );
}
}

private void unlockAllNodes( KernelStatement state )
private void unlockAllNodes( KernelStatement state, long[] nodeIds )
{
PrimitiveLongIterator iterator = nodeIds.iterator();
while ( iterator.hasNext() )
for ( long nodeId : nodeIds )
{
state.locks().optimistic().releaseExclusive( ResourceTypes.NODE, iterator.next() );
state.locks().optimistic().releaseExclusive( ResourceTypes.NODE, nodeId );
}
nodeIds.clear();
}

private boolean performAction( KernelStatement state, RelationshipItem rel, boolean first ) throws KernelException
Expand All @@ -110,23 +133,13 @@ private boolean performAction( KernelStatement state, RelationshipItem rel, bool
{
// if the first relationship is not the same someone added some new rels, so we need to
// lock them all again
unlockAllNodes( state );
unlockAllNodes( state, sortedNodeIds );
sortedNodeIds = null;
return true;
}
}

relIdAction.accept( rel.id() );
return false;
}

private void collectNodeId( RelationshipItem rel )
{
if ( firstRelId == -1 )
{
firstRelId = rel.id();
}

nodeIds.add( rel.startNode() );
nodeIds.add( rel.endNode() );
}
}
Expand Up @@ -43,14 +43,18 @@ public class StoreNodeRelationshipCursor extends StoreAbstractRelationshipCursor
{
private final RelationshipGroupRecord groupRecord;
private final Consumer<StoreNodeRelationshipCursor> instanceCache;
private final RecordCursors cursors;

// Reset all this state on init()
// --------
private boolean isDense;
private long relationshipId;
private long fromNodeId;
private Direction direction;
private IntPredicate allowedTypes;
private int groupChainIndex;
private boolean end;
private final RecordCursors cursors;
// --------

public StoreNodeRelationshipCursor( RelationshipRecord relationshipRecord,
RelationshipGroupRecord groupRecord,
Expand All @@ -75,6 +79,7 @@ public StoreNodeRelationshipCursor init( boolean isDense,
this.fromNodeId = fromNodeId;
this.direction = direction;
this.allowedTypes = allowedTypes;
this.groupChainIndex = 0;
this.end = false;

if ( isDense && relationshipId != Record.NO_NEXT_RELATIONSHIP.intValue() )
Expand Down
Expand Up @@ -70,22 +70,28 @@ public void shouldLockNodesInOrderAndConsumeTheRelationships() throws Throwable
Collector collector = new Collector();
TwoPhaseNodeForRelationshipLocking locking = new TwoPhaseNodeForRelationshipLocking( ops, collector );

RelationshipData relationship1 = new RelationshipData( 21L, nodeId, 43L );
RelationshipData relationship2 = new RelationshipData( 22L, 40L, nodeId );
RelationshipData relationship3 = new RelationshipData( 23L, nodeId, 41L );
returnRelationships( ops, state, nodeId, false, relationship1, relationship2, relationship3 );
returnRelationships(
ops, state, nodeId, false,
new RelationshipData( 21L, nodeId, 43L ),
new RelationshipData( 22L, 40L, nodeId ),
new RelationshipData( 23L, nodeId, 41L ),
new RelationshipData( 2L, nodeId, 3L ),
new RelationshipData( 3L, 49L, nodeId ),
new RelationshipData( 50L, nodeId, 41L ) );

InOrder inOrder = inOrder( locks );

// when
locking.lockAllNodesAndConsumeRelationships( nodeId, state );

// then
inOrder.verify( locks ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 3L );
inOrder.verify( locks ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 40L );
inOrder.verify( locks ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 41L );
inOrder.verify( locks ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, nodeId );
inOrder.verify( locks ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 43L );
assertEquals( set( 21L, 22L, 23L ), collector.set );
inOrder.verify( locks ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 49L );
assertEquals( set( 21L, 22L, 23L, 2L, 3L, 50L ), collector.set );
}

@Test
Expand Down

0 comments on commit 842d61d

Please sign in to comment.