Skip to content

Commit

Permalink
Log errors when stopping unsuccessful lock session
Browse files Browse the repository at this point in the history
When KTI is terminated, it stops the associated locks client. For slave write
transactions this results in a END_LOCK_SESSION request to master. Such request
can often be unsuccessful because transactions can be terminated during HA
state switch when communication between cluster members is broken.

This commit makes SlaveLocksClient only log and not rethrow errors from the
END_LOCK_SESSION request. Such errors can otherwise delay/prevent HA state
machine from switching to slave/master.
  • Loading branch information
lutovich committed Jul 19, 2016
1 parent 5dfc798 commit 9e8e6ef
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 52 deletions.
Expand Up @@ -631,7 +631,7 @@ protected Locks createLockManager( HighAvailabilityModeSwitcher highAvailability
LifeSupport modeSwitchersLife, final Config config,
DelegateInvocationHandler<Master> masterDelegateInvocationHandler,
RequestContextFactory requestContextFactory,
AvailabilityGuard availabilityGuard, final LogService logging )
AvailabilityGuard availabilityGuard, final LogService logService )
{
DelegateInvocationHandler<Locks> lockManagerDelegate = new DelegateInvocationHandler<>( Locks.class );
final Locks lockManager = (Locks) newProxyInstance( Locks.class.getClassLoader(),
Expand All @@ -644,9 +644,9 @@ protected Locks createLockManager( HighAvailabilityModeSwitcher highAvailability
@Override
public Locks newInstance()
{
return CommunityEditionModule.createLockManager( config, logging );
return CommunityEditionModule.createLockManager( config, logService );
}
}, config ) );
}, logService.getInternalLogProvider(), config ) );
return lockManager;
}

Expand Down
Expand Up @@ -29,25 +29,28 @@
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;

public class LockManagerModeSwitcher extends AbstractModeSwitcher<Locks>
{
private final DelegateInvocationHandler<Master> master;
private final RequestContextFactory requestContextFactory;
private final AvailabilityGuard availabilityGuard;
private final Factory<Locks> locksFactory;
private final LogProvider logProvider;
private final Config config;

public LockManagerModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier,
DelegateInvocationHandler<Locks> delegate, DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory, AvailabilityGuard availabilityGuard,
Factory<Locks> locksFactory, Config config )
Factory<Locks> locksFactory, LogProvider logProvider, Config config )
{
super( modeSwitcherNotifier, delegate );
this.master = master;
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.locksFactory = locksFactory;
this.logProvider = logProvider;
this.config = config;
}

Expand All @@ -61,6 +64,6 @@ protected Locks getMasterImpl( LifeSupport life )
protected Locks getSlaveImpl( LifeSupport life )
{
return life.add( new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(),
availabilityGuard, config ) );
availabilityGuard, logProvider, config ) );
}
}
Expand Up @@ -26,30 +26,33 @@
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.LogProvider;

public class SlaveLockManager extends LifecycleAdapter implements Locks
{
private final RequestContextFactory requestContextFactory;
private final Locks local;
private final Master master;
private final AvailabilityGuard availabilityGuard;
private final LogProvider logProvider;
private final boolean txTerminationAwareLocks;

public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextFactory, Master master,
AvailabilityGuard availabilityGuard, Config config )
AvailabilityGuard availabilityGuard, LogProvider logProvider, Config config )
{
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.local = localLocks;
this.master = master;
this.logProvider = logProvider;
this.txTerminationAwareLocks = config.get( KernelTransactions.tx_termination_aware_locks );
}

@Override
public Client newClient()
{
Client client = local.newClient();
return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard,
return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard, logProvider,
txTerminationAwareLocks );
}

Expand Down
Expand Up @@ -36,6 +36,8 @@
import org.neo4j.kernel.impl.locking.LockClientStoppedException;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.locking.LockType.READ;
import static org.neo4j.kernel.impl.locking.LockType.WRITE;
Expand All @@ -59,6 +61,7 @@ class SlaveLocksClient implements Locks.Client
// Using atomic ints to avoid creating garbage through boxing.
private final Map<Locks.ResourceType, Map<Long, AtomicInteger>> sharedLocks;
private final Map<Locks.ResourceType, Map<Long, AtomicInteger>> exclusiveLocks;
private final Log log;
private final boolean txTerminationAwareLocks;
private boolean initialized;
private volatile boolean stopped;
Expand All @@ -69,13 +72,15 @@ public SlaveLocksClient(
Locks localLockManager,
RequestContextFactory requestContextFactory,
AvailabilityGuard availabilityGuard,
LogProvider logProvider,
boolean txTerminationAwareLocks )
{
this.master = master;
this.client = local;
this.localLockManager = localLockManager;
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.log = logProvider.getLog( getClass() );
this.txTerminationAwareLocks = txTerminationAwareLocks;
sharedLocks = new HashMap<>();
exclusiveLocks = new HashMap<>();
Expand Down Expand Up @@ -203,7 +208,7 @@ public void stop()
if ( txTerminationAwareLocks )
{
client.stop();
endLockSessionOnMaster( false );
stopLockSessionOnMaster();
stopped = true;
}
}
Expand All @@ -218,7 +223,7 @@ public void close()
{
if ( !stopped )
{
endLockSessionOnMaster( true );
closeLockSessionOnMaster();
stopped = true;
}
initialized = false;
Expand All @@ -232,6 +237,23 @@ public int getLockSessionId()
return initialized ? client.getLockSessionId() : -1;
}

private void stopLockSessionOnMaster()
{
try
{
endLockSessionOnMaster( false );
}
catch ( Throwable t )
{
log.warn( "Unable to stop lock session on master", t );
}
}

private void closeLockSessionOnMaster()
{
endLockSessionOnMaster( true );
}

private void endLockSessionOnMaster( boolean success )
{
try ( Response<Void> ignored = master.endLockSession( newRequestContextFor( client ), success ) )
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.kernel.impl.locking.community.CommunityLockManger;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -87,6 +88,7 @@ public void doesNotCreateClientsAfterShutdown() throws Throwable

private SlaveLockManager newSlaveLockManager( Locks localLocks )
{
return new SlaveLockManager( localLocks, requestContextFactory, master, availabilityGuard, new Config() );
return new SlaveLockManager( localLocks, requestContextFactory, master, availabilityGuard,
NullLogProvider.getInstance(), new Config() );
}
}
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;

import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void readersCanAcquireLockAsSoonAsItReleasedOnMaster() throws Interrupted
private SlaveLocksClient createClient()
{
return new SlaveLocksClient( master, lockManager.newClient(), lockManager,
requestContextFactory, availabilityGuard, false );
requestContextFactory, availabilityGuard, NullLogProvider.getInstance(), false );
}

private static class LockedOnMasterAnswer implements Answer
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.ha.lock;

import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -43,12 +44,12 @@
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.kernel.impl.locking.community.CommunityLockManger;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLog;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
Expand All @@ -62,6 +63,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.kernel.impl.locking.ResourceTypes.NODE;
import static org.neo4j.logging.AssertableLogProvider.inLog;

public class SlaveLocksClientTest
{
Expand All @@ -70,6 +72,7 @@ public class SlaveLocksClientTest
private Locks.Client local;
private SlaveLocksClient client;
private AvailabilityGuard availabilityGuard;
private AssertableLogProvider logProvider;

@Before
public void setUp() throws Exception
Expand All @@ -79,6 +82,7 @@ public void setUp() throws Exception

lockManager = new CommunityLockManger();
local = spy( lockManager.newClient() );
logProvider = new AssertableLogProvider();

LockResult lockResultOk = new LockResult( LockStatus.OK_LOCKED );
TransactionStreamResponse<LockResult> responseOk =
Expand Down Expand Up @@ -502,37 +506,29 @@ public void closeWhenNotInitialized()
}

@Test
public void stopThrowsWhenMasterCommunicationThrowsComException()
public void stopDoesNotThrowWhenMasterCommunicationThrowsComException()
{
ComException error = new ComException( "Communication failure" );
when( master.endLockSession( any( RequestContext.class ), anyBoolean() ) ).thenThrow( error );

try
{
client.stop();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( DistributedLockFailureException.class ) );
}
client.stop();

logProvider.assertExactly( inLog( SlaveLocksClient.class )
.warn( equalTo( "Unable to stop lock session on master" ),
CoreMatchers.<Throwable>instanceOf( DistributedLockFailureException.class ) ) );
}

@Test
public void stopThrowsWhenMasterCommunicationThrows()
public void stopDoesNotThrowWhenMasterCommunicationThrows()
{
RuntimeException error = new IllegalArgumentException( "Wrong params" );
when( master.endLockSession( any( RequestContext.class ), anyBoolean() ) ).thenThrow( error );

try
{
client.stop();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertEquals( error, e );
}
client.stop();

logProvider.assertExactly( inLog( SlaveLocksClient.class )
.warn( equalTo( "Unable to stop lock session on master" ),
CoreMatchers.<Throwable>equalTo( error ) ) );
}

@Test
Expand All @@ -549,7 +545,7 @@ public void stopDoesNothingWhenLocksAreNotTxTerminationAware()
private SlaveLocksClient newSlaveLocksClient( Locks lockManager, boolean txTerminationAwareLocks )
{
return new SlaveLocksClient( master, local, lockManager, mock( RequestContextFactory.class ),
availabilityGuard, txTerminationAwareLocks );
availabilityGuard, logProvider, txTerminationAwareLocks );
}

private SlaveLocksClient stoppedClient()
Expand Down
Expand Up @@ -21,7 +21,13 @@

import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -41,15 +47,13 @@
import org.neo4j.test.ha.ClusterRule;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.Workers;

import static org.junit.Assert.assertEquals;

import static java.lang.System.currentTimeMillis;

import static org.junit.Assert.assertEquals;
import static org.neo4j.helpers.TimeUtil.parseTimeMillis;
import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.MASTER;
import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.UNKNOWN;
import static org.neo4j.kernel.impl.MyRelTypes.TEST;
import static org.neo4j.kernel.impl.ha.ClusterManager.memberSeesOtherMemberAsFailed;
import static org.neo4j.kernel.impl.api.KernelTransactions.tx_termination_aware_locks;
import static org.neo4j.kernel.impl.ha.ClusterManager.memberThinksItIsRole;

/**
Expand Down Expand Up @@ -77,27 +81,38 @@
* This test is a stress test and duration of execution can be controlled via system property
* -D{@link org.neo4j.kernel.ha.transaction.TransactionThroughMasterSwitchStressIT}.duration
*/
@RunWith( Parameterized.class )
public class TransactionThroughMasterSwitchStressIT
{
@Parameter
public boolean txTerminationAwareLocks;

@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withInstanceSetting( HaSettings.slave_only,
new IntFunction<String>() // instances 1 and 2 are slave only
.withSharedSetting( tx_termination_aware_locks, String.valueOf( txTerminationAwareLocks ) )
.withInstanceSetting( HaSettings.slave_only,
new IntFunction<String>() // instances 1 and 2 are slave only
{
@Override
public String apply( int value )
{
@Override
public String apply( int value )
if ( value == 1 || value == 2 )
{
if ( value == 1 || value == 2 )
{
return Settings.TRUE;
}
else
{
return Settings.FALSE;
}
return Settings.TRUE;
}
else
{
return Settings.FALSE;
}
}
);
}
);

@Parameters(name = "txTerminationAwareLocks={0}")
public static List<Object[]> txTerminationAwareLocks()
{
return Arrays.asList( new Object[]{false}, new Object[]{true} );
}

@Test
public void shouldNotHaveTransactionsRunningThroughRoleSwitchProduceInconsistencies() throws Throwable
Expand Down

0 comments on commit 9e8e6ef

Please sign in to comment.