Skip to content

Commit

Permalink
Takes read locks while matching on the database
Browse files Browse the repository at this point in the history
Decorates the SPI, taking read locks every time a graph element is passed through the SPI. We also wrap the result iterator, taking care to close it when QueryContext once the iterator is empty, so the read locks can be released.
  • Loading branch information
systay committed Nov 10, 2012
1 parent d210dc7 commit 2d17082
Show file tree
Hide file tree
Showing 10 changed files with 840 additions and 92 deletions.
@@ -0,0 +1,76 @@
/**
* Copyright (c) 2002-2012 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.spi.gdsimpl;

import java.util.HashMap;
import java.util.Map;

import org.neo4j.graphdb.Lock;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Transaction;

public class GDSBackedLocker implements RepeatableReadQueryContext.Locker
{
private final Transaction transaction;
private final Map<Long, Lock> nodeLocks = new HashMap<Long, Lock>();
private final Map<Long, Lock> relationshipLocks = new HashMap<Long, Lock>();

public GDSBackedLocker( Transaction transaction )
{
this.transaction = transaction;
}

@Override
public void readLock( PropertyContainer pc )
{
if ( pc instanceof Node )
{
lock( pc, ((Node) pc).getId(), nodeLocks );
} else
{
lock( pc, ((Relationship) pc).getId(), relationshipLocks );
}
}

private void lock( PropertyContainer pc, long id, Map<Long, Lock> lockHolder )
{
if ( !lockHolder.containsKey( id ) )
{
Lock lock = transaction.acquireReadLock( pc );
relationshipLocks.put( id, lock );
}
}

@Override
public void releaseAllReadLocks()
{
for(Lock lock : nodeLocks.values())
{
lock.release();
}

for(Lock lock : relationshipLocks.values())
{
lock.release();
}
}
}
@@ -0,0 +1,166 @@
/**
* Copyright (c) 2002-2012 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.spi.gdsimpl;

import org.neo4j.cypher.internal.spi.QueryContext;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.graphdb.Relationship;
import org.neo4j.helpers.collection.IterableWrapper;

/**
* This QueryContext is responsible for taking read locks for all operations that read from the database.
* <p/>
* The close() method will then release all locks.
*/
public class RepeatableReadQueryContext implements QueryContext
{

public interface Locker
{
void readLock( PropertyContainer pc );

void releaseAllReadLocks();
}

private final QueryContext inner;
private final Locker locker;
private final Operations<Node> nodeOps;
private final Operations<Relationship> relOps;

public RepeatableReadQueryContext( QueryContext inner, Locker locker )
{
this.inner = inner;
this.locker = locker;
this.nodeOps = new LockingOperations<Node>( inner.nodeOps() );
this.relOps = new LockingOperations<Relationship>( inner.relationshipOps() );
}

@Override
public Operations<Node> nodeOps()
{
return nodeOps;
}

@Override
public Operations<Relationship> relationshipOps()
{
return relOps;
}

@Override
public Node createNode()
{
return inner.createNode();
}

@Override
public Relationship createRelationship( Node start, Node end, String relType )
{
return inner.createRelationship( start, end, relType );
}

@Override
public Iterable<Relationship> getRelationshipsFor( Node node, Direction dir, String... types )
{
locker.readLock( node );
Iterable<Relationship> iter = inner.getRelationshipsFor( node, dir, types );
return new LockingIterator( iter );
}

@Override
public void close()
{
locker.releaseAllReadLocks();
}

private class LockingIterator extends IterableWrapper<Relationship, Relationship>
{
public LockingIterator( Iterable<Relationship> iterableToWrap )
{
super( iterableToWrap );
}

@Override
protected Relationship underlyingObjectToObject( Relationship rel )
{
locker.readLock( rel );
return rel;
}
}

private class LockingOperations<T extends PropertyContainer> implements Operations<T>
{
private final Operations<T> inner;

private LockingOperations( Operations<T> inner )
{
this.inner = inner;
}

@Override
public void delete( T obj )
{
inner.delete( obj );
}

@Override
public void setProperty( T obj, String propertyKey, Object value )
{
inner.setProperty( obj, propertyKey, value );
}

@Override
public void removeProperty( T obj, String propertyKey )
{
inner.removeProperty( obj, propertyKey );
}

@Override
public Object getProperty( T obj, String propertyKey )
{
locker.readLock( obj );
return inner.getProperty( obj, propertyKey );
}

@Override
public boolean hasProperty( T obj, String propertyKey )
{
locker.readLock( obj );
return inner.hasProperty( obj, propertyKey );
}

@Override
public Iterable<String> propertyKeys( T obj )
{
locker.readLock( obj );
return inner.propertyKeys( obj );
}

@Override
public T getById( long id )
{
T obj = inner.getById( id );
locker.readLock( obj );
return obj;
}
}
}
@@ -0,0 +1,94 @@
/**
* Copyright (c) 2002-2012 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal

import spi.QueryContext
import org.neo4j.graphdb.{TransactionFailureException, Transaction}
import org.neo4j.kernel.impl.nioneo.store.ConstraintViolationException
import org.neo4j.cypher.NodeStillHasRelationshipsException

/**
* An iterator that decorates an inner iterator, and calls close() on the QueryContext once
* the inner iterator is empty.
*/
class ClosingIterator[T](inner: Iterator[T], queryContext: QueryContext, tx: Transaction) extends Iterator[T] {
private var closed: Boolean = false
lazy val still_has_relationships = "Node record Node\\[(\\d),.*] still has relationships".r

def hasNext: Boolean = failIfThrows {
val innerHasNext: Boolean = inner.hasNext
if (!innerHasNext) {
close()
}
innerHasNext
}


def next(): T = failIfThrows {
val result: T = inner.next()
if (!inner.hasNext) {
close()
}
result
}

private def close() {
translateException {
if (!closed) {
closed = true
queryContext.close()
}
tx.success()
tx.finish()
}
}

private def translateException[U](f: => U): U = try {
f
} catch {
case e: TransactionFailureException => {

var cause:Throwable = e
while(cause.getCause != null)
{
cause = cause.getCause
if(cause.isInstanceOf[ConstraintViolationException])
{
cause.getMessage match {
case still_has_relationships(id) => throw new NodeStillHasRelationshipsException(id.toLong, e)
case _ => throw e
}
}
}

throw e
}
}


private def failIfThrows[U](f: => U): U = try {
f
} catch {
case t: Throwable if !closed =>
tx.failure()
tx.finish()
throw t
}
}
Expand Up @@ -21,6 +21,7 @@ package org.neo4j.cypher.internal.commands.expressions


import org.neo4j.cypher.CypherTypeException import org.neo4j.cypher.CypherTypeException
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import collection.Map
import org.neo4j.cypher.internal.helpers.{IsCollection, CollectionSupport} import org.neo4j.cypher.internal.helpers.{IsCollection, CollectionSupport}
import org.neo4j.graphdb.{PropertyContainer, Relationship, Node} import org.neo4j.graphdb.{PropertyContainer, Relationship, Node}
import org.neo4j.cypher.internal.symbols._ import org.neo4j.cypher.internal.symbols._
Expand Down Expand Up @@ -132,8 +133,8 @@ case class SubstringFunction(orig: Expression, start: Expression, length: Expres
def calculateType(symbols: SymbolTable) = StringType() def calculateType(symbols: SymbolTable) = StringType()


def symbolTableDependencies = orig.symbolTableDependencies ++ def symbolTableDependencies = orig.symbolTableDependencies ++
start.symbolTableDependencies ++ start.symbolTableDependencies ++
length.symbolTableDependencies length.symbolTableDependencies
} }


case class ReplaceFunction(orig: Expression, search: Expression, replaceWith: Expression) extends NullInNullOutExpression(orig) with StringHelper { case class ReplaceFunction(orig: Expression, search: Expression, replaceWith: Expression) extends NullInNullOutExpression(orig) with StringHelper {
Expand Down Expand Up @@ -164,8 +165,8 @@ case class ReplaceFunction(orig: Expression, search: Expression, replaceWith: Ex
def calculateType(symbols: SymbolTable) = StringType() def calculateType(symbols: SymbolTable) = StringType()


def symbolTableDependencies = orig.symbolTableDependencies ++ def symbolTableDependencies = orig.symbolTableDependencies ++
search.symbolTableDependencies ++ search.symbolTableDependencies ++
replaceWith.symbolTableDependencies replaceWith.symbolTableDependencies
} }


case class LeftFunction(orig: Expression, length: Expression) extends NullInNullOutExpression(orig) with StringHelper with NumericHelper { case class LeftFunction(orig: Expression, length: Expression) extends NullInNullOutExpression(orig) with StringHelper with NumericHelper {
Expand Down Expand Up @@ -193,7 +194,7 @@ case class LeftFunction(orig: Expression, length: Expression) extends NullInNull
def calculateType(symbols: SymbolTable) = StringType() def calculateType(symbols: SymbolTable) = StringType()


def symbolTableDependencies = orig.symbolTableDependencies ++ def symbolTableDependencies = orig.symbolTableDependencies ++
length.symbolTableDependencies length.symbolTableDependencies
} }


case class RightFunction(orig: Expression, length: Expression) extends NullInNullOutExpression(orig) with StringHelper with NumericHelper { case class RightFunction(orig: Expression, length: Expression) extends NullInNullOutExpression(orig) with StringHelper with NumericHelper {
Expand Down Expand Up @@ -221,5 +222,5 @@ case class RightFunction(orig: Expression, length: Expression) extends NullInNul
def calculateType(symbols: SymbolTable) = StringType() def calculateType(symbols: SymbolTable) = StringType()


def symbolTableDependencies = orig.symbolTableDependencies ++ def symbolTableDependencies = orig.symbolTableDependencies ++
length.symbolTableDependencies length.symbolTableDependencies
} }

0 comments on commit 2d17082

Please sign in to comment.