Skip to content

Commit

Permalink
Prioritizes fast switch to master in HA
Browse files Browse the repository at this point in the history
Previously as part of switching to master there was waiting introduced to
let active transactions complete normally before switching. The concept of
doing that is nice, but that comes at an expense of cluster downtime when
there will be times where the slave that was picked to become master could
wait for a long time for active transactions that perhaps would never
complete or was blocked on some condition related to the previous master
no longer being available. Reasons why having master switching wait is
bad:

 - A master in a cluster is very important, without it the cluster cannot
   process any requests
 - Awaiting open transactions to complete assumes that this instance just
   now was a slave that is switching to master, which means the previous
   master where these active transactions were hosted is no longer available
   so these open transactions cannot continue and complete anyway,
   so what's the point waiting for them?
 - Read transactions may still be able to complete, but the correct
   response to failures in those is to have them throw transient error exceptions
   hinting that they should be retried, at which point they may get redirected
   to another instance, or to this instance if it has completed the switch until then.

There was a related wait in SlaveLocksClient which was there to bridge
some gap if slave in the middle of a transaction found itself unavailable.
This waiting would wait for the slave to become available again, with the
hope that the same cluster member was still master and hadn't restarted
from last time this transaction saw the master. A better way to handle
this is to make sure such transaction threads gets notified about this
fact via a transient error exception, so that the transaction can be
retried. Another problem with waiting at this point in time was that
already acquired locks would be helt during a long time, blocking other
transactions in the cluster.

While doing this the setting `ha.state_switch_timeout` was split into that
and `ha.internal_state_switch_timeout`. Previously that timeout setting
was used both for timeout for request threads as well as timeout for
internal switching logic, which was just a mixup that shouldn't have been
to begin with. This means that members that switch to slave waits for
active transactions to complete, using `ha.internal_state_switch_timeout`
and requests wanting to begin transaction or similar waits for db to be
available using `ha.state_switch_timeout`, which is much longer.
  • Loading branch information
tinwelint committed Oct 25, 2015
1 parent 3c4fc29 commit d752563
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 146 deletions.
Expand Up @@ -234,6 +234,17 @@ public boolean isAvailable( long millis )
return availability( millis ) == Availability.AVAILABLE; return availability( millis ) == Availability.AVAILABLE;
} }


/**
* Checks if available. If not then an {@link UnavailableException} is thrown describing why.
* This methods doesn't wait like {@link #await(long)} does.
*
* @throws UnavailableException if not available.
*/
public void checkAvailable() throws UnavailableException
{
await( 0 );
}

/** /**
* Await the database becoming available. * Await the database becoming available.
* *
Expand Down
Expand Up @@ -23,11 +23,16 @@


import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import org.neo4j.helpers.Clock;
import org.neo4j.helpers.TickingClock; import org.neo4j.helpers.TickingClock;
import org.neo4j.kernel.AvailabilityGuard.UnavailableException;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.NullLog;


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -333,4 +338,27 @@ public void shouldExplainWhoIsBlockingAccess() throws
// Then // Then
assertThat( availabilityGuard.describeWhoIsBlocking(), equalTo( "2 reasons for blocking: Requirement 1, Requirement 2." ) ); assertThat( availabilityGuard.describeWhoIsBlocking(), equalTo( "2 reasons for blocking: Requirement 1, Requirement 2." ) );
} }

@Test
public void shouldExplainBlockersOnCheckAvailable() throws Exception
{
// GIVEN
AvailabilityGuard availabilityGuard = new AvailabilityGuard( Clock.SYSTEM_CLOCK, NullLog.getInstance() );
// At this point it should be available
availabilityGuard.checkAvailable();

// WHEN
availabilityGuard.require( REQUIREMENT_1 );

// THEN
try
{
availabilityGuard.checkAvailable();
fail( "Should not be available" );
}
catch ( UnavailableException e )
{
assertThat( e.getMessage(), containsString( REQUIREMENT_1.description() ) );
}
}
} }
Expand Up @@ -47,9 +47,14 @@ public class HaSettings
@Description( "How long a slave will wait for response from master before giving up." ) @Description( "How long a slave will wait for response from master before giving up." )
public static final Setting<Long> read_timeout = setting( "ha.read_timeout", DURATION, "20s" ); public static final Setting<Long> read_timeout = setting( "ha.read_timeout", DURATION, "20s" );


@Description( "Timeout for waiting for instance to become master or slave." ) @Description( "Timeout for request threads waiting for instance to become master or slave." )
public static final Setting<Long> state_switch_timeout = setting( "ha.state_switch_timeout", DURATION, "120s" ); public static final Setting<Long> state_switch_timeout = setting( "ha.state_switch_timeout", DURATION, "120s" );


@Description( "Timeout for waiting for internal conditions during state switch, like for transactions "
+ "to complete, before switching to master or slave." )
public static final Setting<Long> internal_state_switch_timeout =
setting( "ha.internal_state_switch_timeout", DURATION, "10s" );

@Description( "Timeout for taking remote (write) locks on slaves. Defaults to ha.read_timeout." ) @Description( "Timeout for taking remote (write) locks on slaves. Defaults to ha.read_timeout." )
public static final Setting<Long> lock_read_timeout = setting( "ha.lock_read_timeout", DURATION, read_timeout ); public static final Setting<Long> lock_read_timeout = setting( "ha.lock_read_timeout", DURATION, read_timeout );


Expand Down
Expand Up @@ -38,13 +38,9 @@
import org.neo4j.kernel.ha.com.master.SlaveFactory; import org.neo4j.kernel.ha.com.master.SlaveFactory;
import org.neo4j.kernel.ha.id.HaIdGeneratorFactory; import org.neo4j.kernel.ha.id.HaIdGeneratorFactory;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.TransactionCounters;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos;
import static org.neo4j.helpers.Clock.SYSTEM_CLOCK;
import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.MASTER; import static org.neo4j.kernel.ha.cluster.HighAvailabilityModeSwitcher.MASTER;


public class SwitchToMaster implements AutoCloseable public class SwitchToMaster implements AutoCloseable
Expand All @@ -53,7 +49,6 @@ public class SwitchToMaster implements AutoCloseable
Factory<ConversationManager> conversationManagerFactory; Factory<ConversationManager> conversationManagerFactory;
BiFunction<ConversationManager, LifeSupport, Master> masterFactory; BiFunction<ConversationManager, LifeSupport, Master> masterFactory;
BiFunction<Master, ConversationManager, MasterServer> masterServerFactory; BiFunction<Master, ConversationManager, MasterServer> masterServerFactory;
private TransactionCounters transactionCounters;
private Log userLog; private Log userLog;
private HaIdGeneratorFactory idGeneratorFactory; private HaIdGeneratorFactory idGeneratorFactory;
private Config config; private Config config;
Expand All @@ -68,13 +63,12 @@ public SwitchToMaster( LogService logService,
BiFunction<ConversationManager, LifeSupport, Master> masterFactory, BiFunction<ConversationManager, LifeSupport, Master> masterFactory,
BiFunction<Master, ConversationManager, MasterServer> masterServerFactory, BiFunction<Master, ConversationManager, MasterServer> masterServerFactory,
DelegateInvocationHandler<Master> masterDelegateHandler, ClusterMemberAvailability clusterMemberAvailability, DelegateInvocationHandler<Master> masterDelegateHandler, ClusterMemberAvailability clusterMemberAvailability,
Supplier<NeoStoreDataSource> dataSourceSupplier, TransactionCounters transactionCounters) Supplier<NeoStoreDataSource> dataSourceSupplier )
{ {
this.logService = logService; this.logService = logService;
this.conversationManagerFactory = conversationManagerFactory; this.conversationManagerFactory = conversationManagerFactory;
this.masterFactory = masterFactory; this.masterFactory = masterFactory;
this.masterServerFactory = masterServerFactory; this.masterServerFactory = masterServerFactory;
this.transactionCounters = transactionCounters;
this.userLog = logService.getUserLog( getClass() ); this.userLog = logService.getUserLog( getClass() );
this.idGeneratorFactory = idGeneratorFactory; this.idGeneratorFactory = idGeneratorFactory;
this.config = config; this.config = config;
Expand All @@ -95,44 +89,38 @@ public URI switchToMaster( LifeSupport haCommunicationLife, URI me )
{ {
userLog.info( "I am %s, moving to master", myId() ); userLog.info( "I am %s, moving to master", myId() );


// Wait for current transactions to stop first // Do not wait for currently active transactions to complete before continuing switching.
long deadline = SYSTEM_CLOCK.currentTimeMillis() + config.get( HaSettings.state_switch_timeout ); // - A master in a cluster is very important, without it the cluster cannot process any write requests
while ( transactionCounters.getNumberOfActiveTransactions() > 0 && SYSTEM_CLOCK.currentTimeMillis() < deadline ) // - Awaiting open transactions to complete assumes that this instance just now was a slave that is
{ // switching to master, which means the previous master where these active transactions were hosted
parkNanos( MILLISECONDS.toNanos( 10 ) ); // is no longer available so these open transactions cannot continue and complete anyway,
} // so what's the point waiting for them?
// - Read transactions may still be able to complete, but the correct response to failures in those
// is to have them throw transient error exceptions hinting that they should be retried,
// at which point they may get redirected to another instance, or to this instance if it has completed
// the switch until then.


/* idGeneratorFactory.switchToMaster();
* Synchronizing on the xaDataSourceManager makes sense if you also look at HaKernelPanicHandler. In NeoStoreDataSource neoStoreXaDataSource = dataSourceSupplier.get();
* particular, it is possible to get a masterIsElected while recovering the database. That is generally neoStoreXaDataSource.afterModeSwitch();
* going to break things. Synchronizing on the xaDSM as HaKPH does solves this.
*/
//noinspection SynchronizationOnLocalVariableOrMethodParameter
// synchronized ( xaDataSourceManager )
{


idGeneratorFactory.switchToMaster(); ConversationManager conversationManager = conversationManagerFactory.newInstance();
NeoStoreDataSource neoStoreXaDataSource = dataSourceSupplier.get(); Master master = masterFactory.apply( conversationManager, haCommunicationLife );
neoStoreXaDataSource.afterModeSwitch();


ConversationManager conversationManager = conversationManagerFactory.newInstance(); MasterServer masterServer = masterServerFactory.apply( master, conversationManager );
Master master = masterFactory.apply( conversationManager, haCommunicationLife );


MasterServer masterServer = masterServerFactory.apply( master, conversationManager ); haCommunicationLife.add( masterServer );
masterDelegateHandler.setDelegate( master );


haCommunicationLife.add( masterServer ); haCommunicationLife.start();
masterDelegateHandler.setDelegate( master );


haCommunicationLife.start(); URI masterHaURI = getMasterUri( me, masterServer );
clusterMemberAvailability.memberIsAvailable( MASTER, masterHaURI, neoStoreXaDataSource.getStoreId() );
userLog.info( "I am %s, successfully moved to master", myId() );


URI masterHaURI = getMasterUri( me, masterServer ); slaveFactorySupplier.get().setStoreId( neoStoreXaDataSource.getStoreId() );
clusterMemberAvailability.memberIsAvailable( MASTER, masterHaURI, neoStoreXaDataSource.getStoreId() );
userLog.info( "I am %s, successfully moved to master", myId() );


slaveFactorySupplier.get().setStoreId( neoStoreXaDataSource.getStoreId() ); return masterHaURI;

return masterHaURI;
}
} }


private URI getMasterUri( URI me, MasterServer masterServer ) private URI getMasterUri( URI me, MasterServer masterServer )
Expand Down
Expand Up @@ -90,6 +90,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos; import static java.util.concurrent.locks.LockSupport.parkNanos;

import static org.neo4j.helpers.Clock.SYSTEM_CLOCK; import static org.neo4j.helpers.Clock.SYSTEM_CLOCK;
import static org.neo4j.helpers.collection.Iterables.filter; import static org.neo4j.helpers.collection.Iterables.filter;
import static org.neo4j.helpers.collection.Iterables.first; import static org.neo4j.helpers.collection.Iterables.first;
Expand Down Expand Up @@ -262,8 +263,9 @@ public URI switchToSlave( LifeSupport haCommunicationLife, URI me, URI masterUri


monitor.switchToSlaveStarted(); monitor.switchToSlaveStarted();


// Wait for current transactions to stop first // Wait a short while for current transactions to stop first, just to be nice.
long deadline = SYSTEM_CLOCK.currentTimeMillis() + config.get( HaSettings.state_switch_timeout ); // We can't wait forever since switching to our designated role is quite important.
long deadline = SYSTEM_CLOCK.currentTimeMillis() + config.get( HaSettings.internal_state_switch_timeout );
while ( transactionCounters.getNumberOfActiveTransactions() > 0 && SYSTEM_CLOCK.currentTimeMillis() < deadline ) while ( transactionCounters.getNumberOfActiveTransactions() > 0 && SYSTEM_CLOCK.currentTimeMillis() < deadline )
{ {
parkNanos( MILLISECONDS.toNanos( 10 ) ); parkNanos( MILLISECONDS.toNanos( 10 ) );
Expand Down
Expand Up @@ -19,13 +19,13 @@
*/ */
package org.neo4j.kernel.ha.factory; package org.neo4j.kernel.ha.factory;


import org.jboss.netty.logging.InternalLoggerFactory;

import java.io.File; import java.io.File;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.URI; import java.net.URI;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


import org.jboss.netty.logging.InternalLoggerFactory;

import org.neo4j.cluster.ClusterSettings; import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.client.ClusterClient; import org.neo4j.cluster.client.ClusterClient;
Expand Down Expand Up @@ -467,8 +467,7 @@ public MasterServer apply( final Master master, ConversationManager conversation
masterServerFactory, masterServerFactory,
masterDelegateInvocationHandler, clusterMemberAvailability, masterDelegateInvocationHandler, clusterMemberAvailability,
platformModule.dependencies.provideDependency( platformModule.dependencies.provideDependency(
NeoStoreDataSource.class ), NeoStoreDataSource.class ));
platformModule.transactionMonitor);


final HighAvailabilityModeSwitcher highAvailabilityModeSwitcher = new HighAvailabilityModeSwitcher( final HighAvailabilityModeSwitcher highAvailabilityModeSwitcher = new HighAvailabilityModeSwitcher(
switchToSlaveInstance, switchToMasterInstance, switchToSlaveInstance, switchToMasterInstance,
Expand Down Expand Up @@ -655,7 +654,7 @@ protected Locks createLockManager( final HighAvailabilityModeSwitcher highAvaila
lockManagerDelegate ); lockManagerDelegate );
paxosLife.add( new LockManagerModeSwitcher( highAvailabilityModeSwitcher, lockManagerDelegate, paxosLife.add( new LockManagerModeSwitcher( highAvailabilityModeSwitcher, lockManagerDelegate,
masterDelegateInvocationHandler, masterDelegateInvocationHandler,
requestContextFactory, availabilityGuard, config, new Factory<Locks>() requestContextFactory, availabilityGuard, new Factory<Locks>()
{ {
@Override @Override
public Locks newInstance() public Locks newInstance()
Expand Down
Expand Up @@ -21,9 +21,7 @@


import org.neo4j.function.Factory; import org.neo4j.function.Factory;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.DelegateInvocationHandler; import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher; import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher;
import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier; import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier;
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
Expand All @@ -36,19 +34,17 @@ public class LockManagerModeSwitcher extends AbstractModeSwitcher<Locks>
private final DelegateInvocationHandler<Master> master; private final DelegateInvocationHandler<Master> master;
private final RequestContextFactory requestContextFactory; private final RequestContextFactory requestContextFactory;
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private final Config config;
private final Factory<Locks> locksFactory; private final Factory<Locks> locksFactory;


public LockManagerModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier, public LockManagerModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier,
DelegateInvocationHandler<Locks> delegate, DelegateInvocationHandler<Master> master, DelegateInvocationHandler<Locks> delegate, DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory, AvailabilityGuard availabilityGuard, RequestContextFactory requestContextFactory, AvailabilityGuard availabilityGuard,
Config config, Factory<Locks> locksFactory ) Factory<Locks> locksFactory )
{ {
super( modeSwitcherNotifier, delegate ); super( modeSwitcherNotifier, delegate );
this.master = master; this.master = master;
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.config = config;
this.locksFactory = locksFactory; this.locksFactory = locksFactory;
} }


Expand All @@ -62,14 +58,6 @@ protected Locks getMasterImpl( LifeSupport life )
protected Locks getSlaveImpl( LifeSupport life ) protected Locks getSlaveImpl( LifeSupport life )
{ {
return life.add( new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(), return life.add( new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(),
availabilityGuard, availabilityGuard ) );
new SlaveLockManager.Configuration()
{
@Override
public long getAvailabilityTimeout()
{
return config.get( HaSettings.lock_read_timeout );
}
} ) );
} }
} }
Expand Up @@ -31,19 +31,12 @@ public class SlaveLockManager extends LifecycleAdapter implements Locks
private final Locks local; private final Locks local;
private final Master master; private final Master master;
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private final Configuration config;

public interface Configuration
{
long getAvailabilityTimeout();
}


public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextFactory, Master master, public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextFactory, Master master,
AvailabilityGuard availabilityGuard, Configuration config ) AvailabilityGuard availabilityGuard )
{ {
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.config = config;
this.local = localLocks; this.local = localLocks;
this.master = master; this.master = master;
} }
Expand All @@ -52,7 +45,7 @@ public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextF
public Client newClient() public Client newClient()
{ {
return new SlaveLocksClient( return new SlaveLocksClient(
master, local.newClient(), local, requestContextFactory, availabilityGuard, config ); master, local.newClient(), local, requestContextFactory, availabilityGuard );
} }


@Override @Override
Expand Down
Expand Up @@ -25,7 +25,9 @@


import org.neo4j.com.RequestContext; import org.neo4j.com.RequestContext;
import org.neo4j.com.Response; import org.neo4j.com.Response;
import org.neo4j.graphdb.TransientDatabaseFailureException;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.AvailabilityGuard.UnavailableException;
import org.neo4j.kernel.DeadlockDetectedException; import org.neo4j.kernel.DeadlockDetectedException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
Expand Down Expand Up @@ -53,7 +55,6 @@ class SlaveLocksClient implements Locks.Client
private final Locks localLockManager; private final Locks localLockManager;
private final RequestContextFactory requestContextFactory; private final RequestContextFactory requestContextFactory;
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private final SlaveLockManager.Configuration config;


// Using atomic ints to avoid creating garbage through boxing. // Using atomic ints to avoid creating garbage through boxing.
private final Map<Locks.ResourceType, Map<Long, AtomicInteger>> sharedLocks; private final Map<Locks.ResourceType, Map<Long, AtomicInteger>> sharedLocks;
Expand All @@ -65,15 +66,13 @@ public SlaveLocksClient(
Locks.Client local, Locks.Client local,
Locks localLockManager, Locks localLockManager,
RequestContextFactory requestContextFactory, RequestContextFactory requestContextFactory,
AvailabilityGuard availabilityGuard, AvailabilityGuard availabilityGuard )
SlaveLockManager.Configuration config )
{ {
this.master = master; this.master = master;
this.client = local; this.client = local;
this.localLockManager = localLockManager; this.localLockManager = localLockManager;
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.config = config;
sharedLocks = new HashMap<>(); sharedLocks = new HashMap<>();
exclusiveLocks = new HashMap<>(); exclusiveLocks = new HashMap<>();
} }
Expand Down Expand Up @@ -275,12 +274,13 @@ private void makeSureTxHasBeenInitialized()
{ {
try try
{ {
availabilityGuard.await( config.getAvailabilityTimeout() ); availabilityGuard.checkAvailable();
} }
catch ( AvailabilityGuard.UnavailableException e ) catch ( UnavailableException e )
{ {
throw new RuntimeException( e.getMessage() ); throw new TransientDatabaseFailureException( "Database not available", e );
} }

if ( !initialized ) if ( !initialized )
{ {
try ( Response<Void> ignored = master.newLockSession( newRequestContextFor( client ) ) ) try ( Response<Void> ignored = master.newLockSession( newRequestContextFor( client ) ) )
Expand Down

0 comments on commit d752563

Please sign in to comment.