Skip to content

Commit

Permalink
Merge pull request #6153 from chrisvest/2.2-ha-slave-lock-switch
Browse files Browse the repository at this point in the history
Make SlaveTransactionCommitProcess turn ComExceptions into TransientTransactionFailureExceptions
  • Loading branch information
davidegrohmann committed Dec 22, 2015
2 parents e8e5306 + ef6e2d8 commit ada6fb5
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@

import java.io.IOException;

import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;

/**
* Commit process on slaves in HA. Transactions aren't committed here, but sent to the master, committed
Expand Down Expand Up @@ -61,7 +64,14 @@ public long commit( TransactionRepresentation representation, LockGroup locks, C
}
catch ( IOException e )
{
throw new RuntimeException( e );
throw new TransactionFailureException(
Status.Transaction.CouldNotCommit, e, "Could not commit transaction on the master" );
}
catch ( ComException e )
{
throw new TransientTransactionFailureException(
"Cannot commit this transaction on the master. " +
"The master is either down, or we have network connectivity problems.", e );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* This event represents a change in the cluster members internal state. The possible states
* are enumerated in ClusterMemberState.
* are enumerated in {@link HighAvailabilityMemberState}.
*/
public class HighAvailabilityMemberChangeEvent
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public HighAvailabilityMemberState masterIsElected( HighAvailabilityMemberContex
public HighAvailabilityMemberState masterIsAvailable( HighAvailabilityMemberContext context,
InstanceId masterId, URI masterHaURI )
{
// assert context.getAvailableMaster() == null;
if ( masterId.equals( context.getMyId() ) )
{
throw new RuntimeException( "Received a MasterIsAvailable event for my InstanceId while in" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
*/
package org.neo4j.ha;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
Expand All @@ -34,28 +36,34 @@
import org.neo4j.kernel.lifecycle.LifecycleListener;
import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.test.ha.ClusterRule;
import org.neo4j.tooling.GlobalGraphOperations;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.neo4j.helpers.Exceptions.contains;
import static org.neo4j.kernel.impl.ha.ClusterManager.fromXml;

public class ClusterTransactionTest
public class ClusterTransactionIT
{
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() );

@Test
public void givenClusterWhenShutdownMasterThenCannotStartTransactionOnSlave() throws Throwable
{
private ClusterManager.ManagedCluster cluster;

final ClusterManager.ManagedCluster cluster =
clusterRule.provider( fromXml( getClass().getResource( "/threeinstances.xml" ).toURI() ) )
.config( HaSettings.ha_server, ":6001-6005" )
.config( HaSettings.tx_push_factor, "2" ).startCluster();
@Before
public void setUp() throws Exception
{
cluster = clusterRule.provider( fromXml( getClass().getResource( "/threeinstances.xml" ).toURI() ) )
.config( HaSettings.ha_server, ":6001-6005" )
.config( HaSettings.tx_push_factor, "2" ).startCluster();

cluster.await( ClusterManager.allSeesAllAsAvailable() );
}

@Test
public void givenClusterWhenShutdownMasterThenCannotStartTransactionOnSlave() throws Throwable
{
final HighlyAvailableGraphDatabase master = cluster.getMaster();
final HighlyAvailableGraphDatabase slave = cluster.getAnySlave();

Expand Down Expand Up @@ -107,4 +115,50 @@ public void notifyStatusChanged( Object instance, LifecycleStatus from, Lifecycl
// Then
assertThat( result.get(), equalTo( true ) );
}

@Test
public void slaveMustConnectLockManagerToNewMasterAfterTwoOtherClusterMembersRoleSwitch() throws Throwable
{
final HighlyAvailableGraphDatabase initialMaster = cluster.getMaster();
HighlyAvailableGraphDatabase firstSlave = cluster.getAnySlave();
HighlyAvailableGraphDatabase secondSlave = cluster.getAnySlave( firstSlave );

// Run a transaction on the slaves, to make sure that a master connection has been initialised in all
// internal pools.
try ( Transaction tx = firstSlave.beginTx() )
{
firstSlave.createNode();
tx.success();
}
try ( Transaction tx = secondSlave.beginTx() )
{
secondSlave.createNode();
tx.success();
}
cluster.sync();

ClusterManager.RepairKit failedMaster = cluster.fail( initialMaster );
cluster.await( ClusterManager.masterAvailable( initialMaster ) );
failedMaster.repair();
cluster.await( ClusterManager.masterAvailable( initialMaster ) );
cluster.await( ClusterManager.allSeesAllAsAvailable() );

// The cluster has now switched the master role to one of the slaves.
// The slave that didn't switch, should still have done the work to reestablish the connection to the new
// master.
HighlyAvailableGraphDatabase slave = cluster.getAnySlave( initialMaster );
try ( Transaction tx = slave.beginTx() )
{
slave.createNode();
tx.success();
}

// We assert that the transaction above does not throw any exceptions, and that we have now created 3 nodes.
HighlyAvailableGraphDatabase master = cluster.getMaster();
try ( Transaction tx = master.beginTx() )
{
GlobalGraphOperations gops = GlobalGraphOperations.at( master );
assertThat( IteratorUtil.count( gops.getAllNodes() ), is( 3 ) );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
*/
package org.neo4j.kernel.api;

import java.io.File;

import org.junit.Rule;
import org.junit.Test;

import java.io.File;

import org.neo4j.graphdb.InvalidTransactionTypeException;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.kernel.TopLevelTransaction;
import org.neo4j.kernel.ha.HaSettings;
Expand All @@ -37,13 +38,11 @@
import org.neo4j.test.ha.ClusterRule;

import static java.util.Arrays.asList;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import static org.neo4j.graphdb.DynamicLabel.label;
import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.helpers.collection.Iterables.single;
Expand Down Expand Up @@ -174,7 +173,7 @@ public void shouldNotAllowOldUncommittedTransactionsToResumeAndViolateConstraint
slaveTx.finish();
fail( "Expected this commit to fail :(" );
}
catch( TransactionFailureException e )
catch( TransactionFailureException | TransientTransactionFailureException e )
{
assertThat(e.getCause().getCause(), instanceOf( org.neo4j.kernel.api.exceptions.TransactionFailureException.class ));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Arrays;

import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.ResourceReleaser;
import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.test.ConstantRequestContextFactory;
import org.neo4j.test.IntegerResponse;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -101,32 +100,6 @@ public Response<Integer> callMasterMethod( Master master, RequestContext ctx, St
);
}

private static class IntegerResponse extends Response<Integer>
{

public IntegerResponse( Integer response )
{
super( response, StoreId.DEFAULT, new ResourceReleaser()
{
@Override
public void release()
{
}
} );
}

@Override
public void accept( Handler handler ) throws IOException
{
}

@Override
public boolean hasTransactionsToBeApplied()
{
return false;
}
}

private SlaveTokenCreatorFixture fixture;
private Master master;
private RequestContext requestContext;
Expand All @@ -140,14 +113,7 @@ public SlaveTokenCreatorTest( String name, SlaveTokenCreatorFixture fixture )
master = mock( Master.class );
requestContext = new RequestContext( 1, 2, 3, 4, 5 );
this.name = "Poke";
requestContextFactory = new RequestContextFactory( 0, null )
{
@Override
public RequestContext newRequestContext( long epoch, int machineId, int eventIdentifier )
{
return requestContext;
}
};
requestContextFactory = new ConstantRequestContextFactory( requestContext );
tokenCreator = fixture.build( master, requestContextFactory );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,103 @@
*/
package org.neo4j.kernel.ha;

import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.test.ConstantRequestContextFactory;
import org.neo4j.test.LongResponse;

import static org.mockito.Matchers.any;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SlaveTransactionCommitProcessTest
{
private AtomicInteger lastSeenEventIdentifier;
private Master master;
private RequestContext requestContext;
private RequestContextFactory reqFactory;
private Response<Long> response;
private PhysicalTransactionRepresentation tx;
private SlaveTransactionCommitProcess commitProcess;

@Before
public void setUp()
{
lastSeenEventIdentifier = new AtomicInteger( -1 );
master = mock( Master.class );
requestContext = new RequestContext( 10, 11, 12, 13, 14 );
reqFactory = new ConstantRequestContextFactory( requestContext )
{
@Override
public RequestContext newRequestContext( int eventIdentifier )
{
lastSeenEventIdentifier.set( eventIdentifier );
return super.newRequestContext( eventIdentifier );
}
};
response = new LongResponse( 42L );
tx = new PhysicalTransactionRepresentation(
Collections.<Command>emptyList() );
tx.setHeader( new byte[]{}, 1, 1, 1, 1, 1, 1337 );

commitProcess = new SlaveTransactionCommitProcess( master, reqFactory );
}

@Test
public void shouldForwardLockIdentifierToMaster() throws Exception
{
// Given
Master master = mock( Master.class );
RequestContextFactory reqFactory = mock( RequestContextFactory.class );
when( master.commit( requestContext, tx ) ).thenReturn( response );

Response<Long> response = mock(Response.class);
when(response.response()).thenReturn( 1l );
// When
commitProcess.commit( tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );

when(master.commit( any( RequestContext.class), any( TransactionRepresentation.class) )).thenReturn( response );
// Then
assertThat( lastSeenEventIdentifier.get(), is( 1337 ) );
}

SlaveTransactionCommitProcess process = new SlaveTransactionCommitProcess( master, reqFactory );
PhysicalTransactionRepresentation tx = new PhysicalTransactionRepresentation(
Collections.<Command>emptyList() );
tx.setHeader(new byte[]{}, 1, 1, 1, 1, 1, 1337);
@Test( expected = TransientTransactionFailureException.class )
public void mustTranslateComExceptionsToTransientTransactionFailures() throws Exception
{
when( master.commit( requestContext, tx ) ).thenThrow( new ComException() );

// When
process.commit(tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );
commitProcess.commit( tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );
// Then we assert that the right exception is thrown
}

// Then
verify( reqFactory ).newRequestContext( 1337 );
@Test
public void mustTranslateIOExceptionsToKernelTransactionFailures() throws Exception
{
when( master.commit( requestContext, tx ) ).thenThrow( new IOException() );

try
{
commitProcess.commit( tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );
fail( "commit should have thrown" );
}
catch ( TransactionFailureException e )
{
assertThat( e.status(), is( (Status) Status.Transaction.CouldNotCommit ) );
}
}
}

0 comments on commit ada6fb5

Please sign in to comment.