Skip to content

Commit

Permalink
Make SlaveLockClient throw DistributedLockFailureException on master …
Browse files Browse the repository at this point in the history
…communication failures

And also add a bunch of tests for these failure modes.

The DistributedLockFailureException extends the user-facing TransientTransactionFailureExcpetion, so user-code can actually catch and deal with it.
It is also more descriptive, because a ComException doesn't really say much that is actually useful and actionable to the user.
Especially since they have no idea that it can happen in the first place, and - it being a RuntimeException - pop up pretty much anywhere.
  • Loading branch information
chrisvest committed Dec 15, 2015
1 parent 66178b9 commit db7229f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 25 deletions.
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha.lock;

import org.neo4j.com.ComException;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.ha.com.master.Master;

/**
* Thrown upon network communication failures, when taking or releasing distributed locks in HA.
*/
public class DistributedLockFailureException extends TransientTransactionFailureException
{
public DistributedLockFailureException( String message, Master master, ComException cause )
{
super( message + " (for master instance " + master + "). The most common causes of this exception are " +
"network failures, or master-switches where the failing transaction was started before the last " +
"master election.", cause );
}
}
Expand Up @@ -24,11 +24,12 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext; import org.neo4j.com.RequestContext;
import org.neo4j.com.Response; import org.neo4j.com.Response;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.DeadlockDetectedException; import org.neo4j.kernel.DeadlockDetectedException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.locking.AcquireLockTimeoutException; import org.neo4j.kernel.impl.locking.AcquireLockTimeoutException;
Expand Down Expand Up @@ -182,7 +183,7 @@ public void releaseShared( Locks.ResourceType resourceType, long... resourceIds
AtomicInteger counter = lockMap.get( resourceId ); AtomicInteger counter = lockMap.get( resourceId );
if ( counter == null ) if ( counter == null )
{ {
throw new IllegalStateException( this + " cannot release lock it does not hold: EXCLUSIVE " + throw new IllegalStateException( this + " cannot release lock it does not hold: SHARED " +
resourceType + "[" + resourceId + "]" ); resourceType + "[" + resourceId + "]" );
} }
if ( counter.decrementAndGet() == 0 ) if ( counter.decrementAndGet() == 0 )
Expand Down Expand Up @@ -225,15 +226,27 @@ public void releaseAll()
{ {
// Lock session is closed on master at this point // Lock session is closed on master at this point
} }
catch ( ComException e )
{
throw new DistributedLockFailureException(
"Failed to end the lock session on the master (which implies releasing all held locks)",
master, e );
}
initialized = false; initialized = false;
} }
} }


@Override @Override
public void close() public void close()
{ {
releaseAll(); try
client.close(); {
releaseAll();
}
finally
{
client.close();
}
} }


@Override @Override
Expand All @@ -256,6 +269,10 @@ private boolean getReadLockOnMaster( Locks.ResourceType resourceType, long ... r
{ {
return receiveLockResponse( response ); return receiveLockResponse( response );
} }
catch ( ComException e )
{
throw new DistributedLockFailureException( "Cannot get shared lock on master", master, e );
}
} }
else else
{ {
Expand All @@ -271,6 +288,10 @@ private boolean acquireExclusiveOnMaster( Locks.ResourceType resourceType, long.
{ {
return receiveLockResponse( response ); return receiveLockResponse( response );
} }
catch ( ComException e )
{
throw new DistributedLockFailureException( "Cannot get exclusive lock on master", master, e );
}
} }


private boolean receiveLockResponse( Response<LockResult> response ) private boolean receiveLockResponse( Response<LockResult> response )
Expand All @@ -294,18 +315,27 @@ private boolean receiveLockResponse( Response<LockResult> response )


private void makeSureTxHasBeenInitialized() private void makeSureTxHasBeenInitialized()
{ {
availabilityGuard.checkAvailability( availabilityTimeoutMillis, RuntimeException.class ); availabilityGuard.checkAvailability( availabilityTimeoutMillis, TransactionFailureException.class );
if ( !initialized ) if ( !initialized )
{ {
try ( Response<Void> ignored = master.newLockSession( newRequestContextFor( client ) ) ) try ( Response<Void> ignored = master.newLockSession( newRequestContextFor( client ) ) )
{ {
// Lock session is initialized on master at this point // Lock session is initialized on master at this point
} }
catch ( TransactionFailureException e ) catch ( Exception exception )
{ {
// Temporary wrapping, we should review the exception structure of the Locks API to allow this to // Temporary wrapping, we should review the exception structure of the Locks API to allow this to
// not use runtime exceptions here. // not use runtime exceptions here.
throw new org.neo4j.graphdb.TransactionFailureException( "Failed to acquire lock in cluster: " + e.getMessage(), e ); ComException e;
if ( exception instanceof ComException )
{
e = (ComException) exception;
}
else
{
e = new ComException( exception );
}
throw new DistributedLockFailureException( "Failed to start a new lock session on master", master, e );
} }
initialized = true; initialized = true;
} }
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;


import org.neo4j.com.ComException;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.HaSettings; import org.neo4j.kernel.ha.HaSettings;
Expand All @@ -38,7 +37,7 @@


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;

import static org.neo4j.helpers.Exceptions.contains;
import static org.neo4j.kernel.impl.ha.ClusterManager.fromXml; import static org.neo4j.kernel.impl.ha.ClusterManager.fromXml;


public class ClusterTransactionTest public class ClusterTransactionTest
Expand Down Expand Up @@ -79,9 +78,9 @@ public Boolean call() throws Exception
{ {
tx.acquireWriteLock( slave.getNodeById( nodeId ) ); tx.acquireWriteLock( slave.getNodeById( nodeId ) );
} }
catch ( ComException e ) catch ( Exception e )
{ {
return e.getCause() instanceof TransactionFailureException; return contains( e, TransactionFailureException.class );
} }
// Fail otherwise // Fail otherwise
return false; return false;
Expand Down
Expand Up @@ -19,14 +19,14 @@
*/ */
package org.neo4j.ha; package org.neo4j.ha;


import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import org.neo4j.graphdb.DynamicLabel; import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Label;
Expand All @@ -36,6 +36,7 @@
import org.neo4j.graphdb.NotInTransactionException; import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException; import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.graphdb.schema.ConstraintDefinition; import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.kernel.DeadlockDetectedException; import org.neo4j.kernel.DeadlockDetectedException;
import org.neo4j.kernel.ha.HaSettings; import org.neo4j.kernel.ha.HaSettings;
Expand All @@ -51,12 +52,10 @@
import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;

import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;

import static org.neo4j.helpers.collection.IteratorUtil.single; import static org.neo4j.helpers.collection.IteratorUtil.single;
import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.allSeesAllAsAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable; import static org.neo4j.kernel.impl.ha.ClusterManager.masterAvailable;
Expand Down Expand Up @@ -458,7 +457,7 @@ private void assertFinishGetsTransactionFailure( Transaction tx )
tx.finish(); tx.finish();
fail( "Transaction shouldn't be able to finish" ); fail( "Transaction shouldn't be able to finish" );
} }
catch ( TransactionFailureException e ) catch ( TransientTransactionFailureException | TransactionFailureException e )
{ // Good { // Good
} }
} }
Expand Down

0 comments on commit db7229f

Please sign in to comment.