Skip to content

Commit

Permalink
Ensure Bolt throws transient exceptions when read transactions confli…
Browse files Browse the repository at this point in the history
…ct with overlapping delete transactions.

Previously, the bolt server would just pass through whatever random exception bubbled up through the kernel.
Now, the RelationshipProxy and NodeProxy go to greater length to consistently throw IllegalStateException,
and the EntityWrappingEntityValue classes then translate those exceptions into a new
ReadAndDeleteTransactionConflictException, which on the driver-end will be turned into a transient exception.
The transient exception instructs the clients to retry their transactions.
  • Loading branch information
chrisvest committed Apr 9, 2019
1 parent 643cc25 commit 73ddf5d
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 9 deletions.
Expand Up @@ -58,6 +58,7 @@
import org.neo4j.kernel.api.SilentTokenNameLookup;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.store.InvalidRecordException;
import org.neo4j.storageengine.api.EntityType;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.Values;
Expand Down Expand Up @@ -654,6 +655,11 @@ public Iterable<Label> getLabels()
{
throw new IllegalStateException( "Label retrieved through kernel API should exist.", e );
}
catch ( InvalidRecordException e )
{
// This can happen if the labels are stored in a dynamic record, and the node is deleted by an overlapping committed transaction.
throw new IllegalStateException( "This node might have been deleted by an overlapping committed transaction.", e );
}
}

@Override
Expand Down
Expand Up @@ -81,23 +81,26 @@ public final void visit( long id, int type, long startNode, long endNode ) throw
this.endNode = endNode;
}

private void initializeData()
public boolean initializeData()
{
// it enough to check only start node, since it's absence will indicate that data was not yet loaded
// It enough to check only start node, since it's absence will indicate that data was not yet loaded.
if ( startNode == AbstractBaseRecord.NO_ID )
{
KernelTransaction transaction = spi.kernelTransaction();
try ( Statement ignore = transaction.acquireStatement() )
{
RelationshipScanCursor relationships = transaction.ambientRelationshipCursor();
transaction.dataRead().singleRelationship( id, relationships );
//at this point we don't care if it is there or not just load what we got
relationships.next();
// At this point we don't care if it is there or not just load what we got.
boolean wasPresent = relationships.next();
this.type = relationships.type();
this.startNode = relationships.sourceNodeReference();
this.endNode = relationships.targetNodeReference();
// But others might care, e.g. the Bolt server needs to know for serialisation purposes.
return wasPresent;
}
}
return true;
}

@Override
Expand Down
Expand Up @@ -62,7 +62,10 @@ public <E extends Exception> void writeTo( AnyValueWriter<E> writer ) throws E
{
l = Values.stringArray();
p = VirtualValues.EMPTY_MAP;

}
catch ( IllegalStateException e )
{
throw new ReadAndDeleteTransactionConflictException( e );
}

if ( id() < 0 )
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;

import org.neo4j.kernel.api.exceptions.Status;

public class ReadAndDeleteTransactionConflictException extends RuntimeException implements Status.HasStatus
{
private static final String MESSAGE = "Database elements (nodes, relationships, properties) were observed during query execution, but got deleted by an " +
"overlapping committed transaction before the query results could be serialised. The transaction might succeed if it is retried.";

public ReadAndDeleteTransactionConflictException()
{
super( MESSAGE );
}

public ReadAndDeleteTransactionConflictException( Throwable cause )
{
super( MESSAGE, cause );
}

@Override
public Status status()
{
return Status.Transaction.Outdated;
}
}
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.Relationship;
import org.neo4j.kernel.impl.core.RelationshipProxy;
import org.neo4j.values.AnyValueWriter;
import org.neo4j.values.storable.TextValue;
import org.neo4j.values.storable.Values;
Expand Down Expand Up @@ -53,6 +54,17 @@ public Relationship relationshipProxy()
@Override
public <E extends Exception> void writeTo( AnyValueWriter<E> writer ) throws E
{
if ( relationship instanceof RelationshipProxy )
{
RelationshipProxy proxy = (RelationshipProxy) relationship;
if ( !proxy.initializeData() )
{
// If the relationship has been deleted since it was found by the query, then we'll have to tell the client that their transaction conflicted,
// and that they need to retry it.
throw new ReadAndDeleteTransactionConflictException();
}
}

MapValue p;
try
{
Expand All @@ -61,7 +73,10 @@ public <E extends Exception> void writeTo( AnyValueWriter<E> writer ) throws E
catch ( NotFoundException e )
{
p = VirtualValues.EMPTY_MAP;

}
catch ( IllegalStateException e )
{
throw new ReadAndDeleteTransactionConflictException( e );
}

if ( id() < 0 )
Expand Down
Expand Up @@ -27,7 +27,9 @@
import java.io.PrintStream;
import java.net.URI;
import java.util.function.Function;
import java.util.function.Supplier;

import org.neo4j.function.Suppliers;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.config.Configuration;
import org.neo4j.graphdb.config.Setting;
Expand All @@ -47,7 +49,7 @@ public class Neo4jRule implements TestRule, TestServerBuilder
{
private TestServerBuilder builder;
private ServerControls controls;
private PrintStream dumpLogsOnFailureTarget;
private Supplier<PrintStream> dumpLogsOnFailureTarget;

Neo4jRule( TestServerBuilder builder )
{
Expand Down Expand Up @@ -82,7 +84,7 @@ public void evaluate() throws Throwable
{
if ( dumpLogsOnFailureTarget != null )
{
sc.printLogs( dumpLogsOnFailureTarget );
sc.printLogs( dumpLogsOnFailureTarget.get() );
}

throw t;
Expand Down Expand Up @@ -176,6 +178,12 @@ public Neo4jRule withAggregationFunction( Class<?> functionClass )
}

public Neo4jRule dumpLogsOnFailure( PrintStream out )
{
dumpLogsOnFailureTarget = () -> out;
return this;
}

public Neo4jRule dumpLogsOnFailure( Supplier<PrintStream> out )
{
dumpLogsOnFailureTarget = out;
return this;
Expand Down
Expand Up @@ -44,7 +44,7 @@ public class ProcedureTest
public final SuppressOutput suppressOutput = SuppressOutput.suppressAll();
@Rule
public Neo4jRule graphDb = new Neo4jRule()
.dumpLogsOnFailure( System.out )
.dumpLogsOnFailure( () -> System.out ) // Late-bind to System.out to work better with SuppressOutput rule.
.withProcedure( PROCEDURES_CLASS );
private String procedureNamespace = PROCEDURES_CLASS.getPackage().getName();

Expand Down

0 comments on commit 73ddf5d

Please sign in to comment.