Skip to content

Commit

Permalink
Make the SlaveTransactionCommitProcess translate ComExceptions into T…
Browse files Browse the repository at this point in the history
…ransientTransactionFailureExceptions

This should fix the last place, where ComExceptions were allowed to bubble up into user-land.
  • Loading branch information
chrisvest committed Dec 21, 2015
1 parent fd5c97f commit ef6e2d8
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 72 deletions.
Expand Up @@ -21,16 +21,19 @@

import java.io.IOException;

import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;

/**
* Commit process on slaves in HA. Transactions aren't committed here, but sent to the master, committed
Expand Down Expand Up @@ -61,7 +64,14 @@ public long commit( TransactionRepresentation representation, LockGroup locks, C
}
catch ( IOException e )
{
throw new RuntimeException( e );
throw new TransactionFailureException(
Status.Transaction.CouldNotCommit, e, "Could not commit transaction on the master" );
}
catch ( ComException e )
{
throw new TransientTransactionFailureException(
"Cannot commit this transaction on the master. " +
"The master is either down, or we have network connectivity problems.", e );
}
}
}
Expand Up @@ -19,14 +19,15 @@
*/
package org.neo4j.kernel.api;

import java.io.File;

import org.junit.Rule;
import org.junit.Test;

import java.io.File;

import org.neo4j.graphdb.InvalidTransactionTypeException;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.kernel.TopLevelTransaction;
import org.neo4j.kernel.ha.HaSettings;
Expand All @@ -37,13 +38,11 @@
import org.neo4j.test.ha.ClusterRule;

import static java.util.Arrays.asList;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import static org.neo4j.graphdb.DynamicLabel.label;
import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.helpers.collection.Iterables.single;
Expand Down Expand Up @@ -174,7 +173,7 @@ public void shouldNotAllowOldUncommittedTransactionsToResumeAndViolateConstraint
slaveTx.finish();
fail( "Expected this commit to fail :(" );
}
catch( TransactionFailureException e )
catch( TransactionFailureException | TransientTransactionFailureException e )
{
assertThat(e.getCause().getCause(), instanceOf( org.neo4j.kernel.api.exceptions.TransactionFailureException.class ));
}
Expand Down
Expand Up @@ -23,17 +23,16 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Arrays;

import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.ResourceReleaser;
import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.test.ConstantRequestContextFactory;
import org.neo4j.test.IntegerResponse;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -101,32 +100,6 @@ public Response<Integer> callMasterMethod( Master master, RequestContext ctx, St
);
}

private static class IntegerResponse extends Response<Integer>
{

public IntegerResponse( Integer response )
{
super( response, StoreId.DEFAULT, new ResourceReleaser()
{
@Override
public void release()
{
}
} );
}

@Override
public void accept( Handler handler ) throws IOException
{
}

@Override
public boolean hasTransactionsToBeApplied()
{
return false;
}
}

private SlaveTokenCreatorFixture fixture;
private Master master;
private RequestContext requestContext;
Expand All @@ -140,14 +113,7 @@ public SlaveTokenCreatorTest( String name, SlaveTokenCreatorFixture fixture )
master = mock( Master.class );
requestContext = new RequestContext( 1, 2, 3, 4, 5 );
this.name = "Poke";
requestContextFactory = new RequestContextFactory( 0, null )
{
@Override
public RequestContext newRequestContext( long epoch, int machineId, int eventIdentifier )
{
return requestContext;
}
};
requestContextFactory = new ConstantRequestContextFactory( requestContext );
tokenCreator = fixture.build( master, requestContextFactory );
}

Expand Down
Expand Up @@ -19,49 +19,103 @@
*/
package org.neo4j.kernel.ha;

import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.com.ComException;
import org.neo4j.com.RequestContext;
import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.test.ConstantRequestContextFactory;
import org.neo4j.test.LongResponse;

import static org.mockito.Matchers.any;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SlaveTransactionCommitProcessTest
{
private AtomicInteger lastSeenEventIdentifier;
private Master master;
private RequestContext requestContext;
private RequestContextFactory reqFactory;
private Response<Long> response;
private PhysicalTransactionRepresentation tx;
private SlaveTransactionCommitProcess commitProcess;

@Before
public void setUp()
{
lastSeenEventIdentifier = new AtomicInteger( -1 );
master = mock( Master.class );
requestContext = new RequestContext( 10, 11, 12, 13, 14 );
reqFactory = new ConstantRequestContextFactory( requestContext )
{
@Override
public RequestContext newRequestContext( int eventIdentifier )
{
lastSeenEventIdentifier.set( eventIdentifier );
return super.newRequestContext( eventIdentifier );
}
};
response = new LongResponse( 42L );
tx = new PhysicalTransactionRepresentation(
Collections.<Command>emptyList() );
tx.setHeader( new byte[]{}, 1, 1, 1, 1, 1, 1337 );

commitProcess = new SlaveTransactionCommitProcess( master, reqFactory );
}

@Test
public void shouldForwardLockIdentifierToMaster() throws Exception
{
// Given
Master master = mock( Master.class );
RequestContextFactory reqFactory = mock( RequestContextFactory.class );
when( master.commit( requestContext, tx ) ).thenReturn( response );

Response<Long> response = mock(Response.class);
when(response.response()).thenReturn( 1l );
// When
commitProcess.commit( tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );

when(master.commit( any( RequestContext.class), any( TransactionRepresentation.class) )).thenReturn( response );
// Then
assertThat( lastSeenEventIdentifier.get(), is( 1337 ) );
}

SlaveTransactionCommitProcess process = new SlaveTransactionCommitProcess( master, reqFactory );
PhysicalTransactionRepresentation tx = new PhysicalTransactionRepresentation(
Collections.<Command>emptyList() );
tx.setHeader(new byte[]{}, 1, 1, 1, 1, 1, 1337);
@Test( expected = TransientTransactionFailureException.class )
public void mustTranslateComExceptionsToTransientTransactionFailures() throws Exception
{
when( master.commit( requestContext, tx ) ).thenThrow( new ComException() );

// When
process.commit(tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );
commitProcess.commit( tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );
// Then we assert that the right exception is thrown
}

// Then
verify( reqFactory ).newRequestContext( 1337 );
@Test
public void mustTranslateIOExceptionsToKernelTransactionFailures() throws Exception
{
when( master.commit( requestContext, tx ) ).thenThrow( new IOException() );

try
{
commitProcess.commit( tx , new LockGroup(), CommitEvent.NULL, TransactionApplicationMode.INTERNAL );
fail( "commit should have thrown" );
}
catch ( TransactionFailureException e )
{
assertThat( e.status(), is( (Status) Status.Transaction.CouldNotCommit ) );
}
}
}
Expand Up @@ -19,24 +19,25 @@
*/
package org.neo4j.kernel.ha;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.com.ComException;
import org.neo4j.graphdb.ConstraintViolationException;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.test.OtherThreadExecutor;
Expand All @@ -47,7 +48,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertThat;

import static org.neo4j.helpers.collection.IteratorUtil.loop;

/**
Expand Down Expand Up @@ -371,7 +371,7 @@ public Integer doWork( Object state ) throws Exception
}
}
}
catch ( TransactionFailureException e )
catch ( TransactionFailureException | TransientTransactionFailureException e )
{
// Swallowed on purpose, we except it to fail sometimes due to either
// - constraint violation on master
Expand All @@ -381,11 +381,6 @@ public Integer doWork( Object state ) throws Exception
{
// Constraint violation detected on slave while building transaction
}
catch ( ComException e )
{
// Happens sometimes, cause:
// - The lock session requested to start is already in use. Please retry your request in a few seconds.
}
return i;
}
};
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.test;

import org.neo4j.com.RequestContext;
import org.neo4j.kernel.ha.com.RequestContextFactory;

public class ConstantRequestContextFactory extends RequestContextFactory
{
private final RequestContext requestContext;

public ConstantRequestContextFactory( RequestContext requestContext )
{
super( 0, null );
this.requestContext = requestContext;
}

@Override
public RequestContext newRequestContext( long epoch, int machineId, int eventIdentifier )
{
return requestContext;
}
}

0 comments on commit ef6e2d8

Please sign in to comment.