Skip to content

Commit

Permalink
Fixes a race where some mode switcher could get a stale master client
Browse files Browse the repository at this point in the history
Since the mode switchers were racing with the
HighAvailabilityModeSwitcher which is in charge to set the new master
client when switching to slave, somtimes might happen that the switch
in a way they get the old stopped client.
  • Loading branch information
davidegrohmann committed Aug 27, 2015
1 parent a1d4ff3 commit b4ee518
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 144 deletions.
Expand Up @@ -514,10 +514,10 @@ public void rolledOver()

guard = config.get( Configuration.execution_guard_enabled ) ? new Guard( msgLog ) : null;

lockManager = createLockManager();

idGeneratorFactory = createIdGeneratorFactory();

lockManager = createLockManager();

StringLogger messagesLog = logging.getMessagesLog( StoreMigrator.class );
VisibleMigrationProgressMonitor progressMonitor =
new VisibleMigrationProgressMonitor( messagesLog, System.out );
Expand Down
Expand Up @@ -200,8 +200,7 @@ public void run()
@Override
public void notify( ClusterMemberListener listener )
{
for ( MemberIsAvailable memberIsAvailable : clusterMembersSnapshot.getCurrentAvailableMembers
() )
for ( MemberIsAvailable memberIsAvailable : clusterMembersSnapshot.getCurrentAvailableMembers() )
{
listener.memberIsAvailable( memberIsAvailable.getRole(), memberIsAvailable.getInstanceId(),
memberIsAvailable.getRoleUri(), memberIsAvailable.getStoreId() );
Expand Down
Expand Up @@ -19,10 +19,8 @@
*/
package org.neo4j.kernel.ha;

import java.net.URI;

import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.transaction.TransactionPropagator;
Expand All @@ -38,17 +36,17 @@ public CommitProcessSwitcher( TransactionPropagator pusher,
Master master,
DelegateInvocationHandler<TransactionCommitProcess> delegate,
RequestContextFactory requestContextFactory,
HighAvailabilityMemberStateMachine memberStateMachine,
ModeSwitcherNotifier modeSwitcherNotifier,
NeoStoreInjectedTransactionValidator validator,
TransactionCommitProcess innerCommitProcess )
{
super( memberStateMachine, delegate );
super( modeSwitcherNotifier, delegate );
this.masterImpl = new MasterTransactionCommitProcess( innerCommitProcess, pusher, validator );
this.slaveImpl = new SlaveTransactionCommitProcess( master, requestContextFactory );
}

@Override
protected TransactionCommitProcess getSlaveImpl( URI serverHaUri )
protected TransactionCommitProcess getSlaveImpl()
{
return slaveImpl;
}
Expand Down
Expand Up @@ -19,12 +19,12 @@
*/
package org.neo4j.kernel.ha;

import org.jboss.netty.logging.InternalLoggerFactory;

import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.Map;

import org.jboss.netty.logging.InternalLoggerFactory;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.client.ClusterClient;
Expand Down Expand Up @@ -459,8 +459,9 @@ public TransactionCommitProcess create( LogicalTransactionStore logicalTransacti
TransactionCommitProcess inner =
defaultCommitProcessFactory.create( logicalTransactionStore, kernelHealth, neoStore,
storeApplier, txValidator, indexUpdatesValidator, config );
assert highAvailabilityModeSwitcher != null;
new CommitProcessSwitcher( pusher, master, commitProcessDelegate, requestContextFactory,
memberStateMachine, txValidator, inner );
highAvailabilityModeSwitcher, txValidator, inner );

return (TransactionCommitProcess) Proxy
.newProxyInstance( TransactionCommitProcess.class.getClassLoader(),
Expand Down Expand Up @@ -537,7 +538,8 @@ protected Locks createLockManager()
DelegateInvocationHandler<Locks> lockManagerDelegate = new DelegateInvocationHandler<>( Locks.class );
Locks lockManager = (Locks) Proxy.newProxyInstance(
Locks.class.getClassLoader(), new Class[]{Locks.class}, lockManagerDelegate );
new LockManagerModeSwitcher( memberStateMachine, lockManagerDelegate, masterDelegateInvocationHandler,
assert highAvailabilityModeSwitcher != null;
new LockManagerModeSwitcher( highAvailabilityModeSwitcher, lockManagerDelegate, masterDelegateInvocationHandler,
requestContextFactory, availabilityGuard, config, new Factory<Locks>()
{
@Override
Expand All @@ -564,7 +566,8 @@ protected TokenCreator createRelationshipTypeCreator()
(TokenCreator) Proxy.newProxyInstance( TokenCreator.class.getClassLoader(),
new Class[]{TokenCreator.class}, relationshipTypeCreatorDelegate );

new RelationshipTypeCreatorModeSwitcher( memberStateMachine, relationshipTypeCreatorDelegate,
assert highAvailabilityModeSwitcher != null;
new RelationshipTypeCreatorModeSwitcher( highAvailabilityModeSwitcher, relationshipTypeCreatorDelegate,
masterDelegateInvocationHandler, requestContextFactory, kernelProvider, idGeneratorFactory );

return relationshipTypeCreator;
Expand All @@ -585,7 +588,8 @@ protected TokenCreator createPropertyKeyCreator()
TokenCreator propertyTokenCreator =
(TokenCreator) Proxy.newProxyInstance( TokenCreator.class.getClassLoader(),
new Class[]{TokenCreator.class}, propertyKeyCreatorDelegate );
new PropertyKeyCreatorModeSwitcher( memberStateMachine, propertyKeyCreatorDelegate,
assert highAvailabilityModeSwitcher != null;
new PropertyKeyCreatorModeSwitcher( highAvailabilityModeSwitcher, propertyKeyCreatorDelegate,
masterDelegateInvocationHandler, requestContextFactory, kernelProvider, idGeneratorFactory );
return propertyTokenCreator;
}
Expand All @@ -605,7 +609,8 @@ protected TokenCreator createLabelIdCreator()
TokenCreator labelIdCreator =
(TokenCreator) Proxy.newProxyInstance( TokenCreator.class.getClassLoader(),
new Class[]{TokenCreator.class}, labelIdCreatorDelegate );
new LabelTokenCreatorModeSwitcher( memberStateMachine, labelIdCreatorDelegate,
assert highAvailabilityModeSwitcher != null;
new LabelTokenCreatorModeSwitcher( highAvailabilityModeSwitcher, labelIdCreatorDelegate,
masterDelegateInvocationHandler, requestContextFactory, kernelProvider, idGeneratorFactory );
return labelIdCreator;
}
Expand Down
Expand Up @@ -19,13 +19,11 @@
*/
package org.neo4j.kernel.ha;

import java.net.URI;

import org.neo4j.helpers.Provider;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.core.DefaultLabelIdCreator;
Expand All @@ -38,13 +36,13 @@ public class LabelTokenCreatorModeSwitcher extends AbstractModeSwitcher<TokenCre
private final Provider<KernelAPI> kernelProvider;
private final IdGeneratorFactory idGeneratorFactory;

public LabelTokenCreatorModeSwitcher( HighAvailabilityMemberStateMachine stateMachine,
public LabelTokenCreatorModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier,
DelegateInvocationHandler<TokenCreator> delegate,
DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory,
Provider<KernelAPI> kernelProvider, IdGeneratorFactory idGeneratorFactory )
{
super( stateMachine, delegate );
super( modeSwitcherNotifier, delegate );
this.master = master;
this.requestContextFactory = requestContextFactory;
this.kernelProvider = kernelProvider;
Expand All @@ -58,7 +56,7 @@ protected TokenCreator getMasterImpl()
}

@Override
protected TokenCreator getSlaveImpl( URI serverHaUri )
protected TokenCreator getSlaveImpl()
{
return new SlaveLabelTokenCreator( master.cement(), requestContextFactory );
}
Expand Down
Expand Up @@ -19,13 +19,11 @@
*/
package org.neo4j.kernel.ha;

import java.net.URI;

import org.neo4j.helpers.Provider;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.core.DefaultPropertyTokenCreator;
Expand All @@ -38,13 +36,13 @@ public class PropertyKeyCreatorModeSwitcher extends AbstractModeSwitcher<TokenCr
private final Provider<KernelAPI> kernelProvider;
private final IdGeneratorFactory idGeneratorFactory;

public PropertyKeyCreatorModeSwitcher( HighAvailabilityMemberStateMachine stateMachine,
public PropertyKeyCreatorModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier,
DelegateInvocationHandler<TokenCreator> delegate,
DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory,
Provider<KernelAPI> kernelProvider, IdGeneratorFactory idGeneratorFactory )
{
super( stateMachine, delegate );
super( modeSwitcherNotifier, delegate );
this.master = master;
this.requestContextFactory = requestContextFactory;
this.kernelProvider = kernelProvider;
Expand All @@ -58,7 +56,7 @@ protected TokenCreator getMasterImpl()
}

@Override
protected TokenCreator getSlaveImpl( URI serverHaUri )
protected TokenCreator getSlaveImpl()
{
return new SlavePropertyTokenCreator( master.cement(), requestContextFactory );
}
Expand Down
Expand Up @@ -19,13 +19,11 @@
*/
package org.neo4j.kernel.ha;

import java.net.URI;

import org.neo4j.helpers.Provider;
import org.neo4j.kernel.IdGeneratorFactory;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.ha.cluster.AbstractModeSwitcher;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.cluster.ModeSwitcherNotifier;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.core.DefaultRelationshipTypeCreator;
Expand All @@ -38,14 +36,14 @@ public class RelationshipTypeCreatorModeSwitcher extends AbstractModeSwitcher<To
private final Provider<KernelAPI> kernelProvider;
private final IdGeneratorFactory idGeneratorFactory;

public RelationshipTypeCreatorModeSwitcher( HighAvailabilityMemberStateMachine stateMachine,
public RelationshipTypeCreatorModeSwitcher( ModeSwitcherNotifier modeSwitcherNotifier,
DelegateInvocationHandler<TokenCreator> delegate,
DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory,
Provider<KernelAPI> kernelProvider,
IdGeneratorFactory idGeneratorFactory )
{
super( stateMachine, delegate );
super( modeSwitcherNotifier, delegate );
this.master = master;
this.requestContextFactory = requestContextFactory;
this.kernelProvider = kernelProvider;
Expand All @@ -59,7 +57,7 @@ protected TokenCreator getMasterImpl()
}

@Override
protected TokenCreator getSlaveImpl( URI serverHaUri )
protected TokenCreator getSlaveImpl()
{
return new SlaveRelationshipTypeCreator( master.cement(), requestContextFactory );
}
Expand Down
Expand Up @@ -227,7 +227,7 @@ public synchronized void unpause()
}
}

interface Condition
public interface Condition
{
boolean evaluate( int currentTicket, int targetTicket );
}
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.kernel.ha.cluster;

import java.net.URI;

import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
Expand All @@ -32,25 +30,25 @@
*
* @param <T>
*/
public abstract class AbstractModeSwitcher<T> implements Lifecycle
public abstract class AbstractModeSwitcher<T> implements ModeSwitcher, Lifecycle
{
private final DelegateInvocationHandler<T> delegate;
private LifeSupport life;
private final HighAvailability highAvailability;
private DelegateStateSwitcher delegateStateSwitcher;
private final ModeSwitcherNotifier notifier;
private T current = null;

private LifeSupport life = new LifeSupport();

protected AbstractModeSwitcher( HighAvailability highAvailability, DelegateInvocationHandler<T> delegate )
protected AbstractModeSwitcher( ModeSwitcherNotifier notifier, DelegateInvocationHandler<T> delegate )
{
this.notifier = notifier;
this.delegate = delegate;
this.life = new LifeSupport();
this.highAvailability = highAvailability;
highAvailability.addHighAvailabilityMemberListener( delegateStateSwitcher = new DelegateStateSwitcher() );
}

@Override
public void init() throws Throwable
{
life.init();
notifier.addModeSwitcher( this );
}

@Override
Expand All @@ -63,7 +61,7 @@ public void start() throws Throwable
public void stop() throws Throwable
{
life.stop();
highAvailability.removeHighAvailabilityMemberListener( delegateStateSwitcher );
notifier.removeModeSwitcher( this );
}

@Override
Expand All @@ -72,69 +70,37 @@ public void shutdown() throws Throwable
life.shutdown();
}

protected abstract T getSlaveImpl( URI serverHaUri );

protected abstract T getMasterImpl();

private class DelegateStateSwitcher implements HighAvailabilityMemberListener
@Override
public void switchToMaster()
{
private T current = null;

@Override
public void masterIsElected( HighAvailabilityMemberChangeEvent event )
{
stateChanged( event );
}

@Override
public void masterIsAvailable( HighAvailabilityMemberChangeEvent event )
{
stateChanged( event );
}

@Override
public void slaveIsAvailable( HighAvailabilityMemberChangeEvent event )
{
}

@Override
public void instanceStops( HighAvailabilityMemberChangeEvent event )
{
stateChanged( event );
}
shutdownCurrent();
delegate.setDelegate( current = life.add( getMasterImpl() ) );
life.start();
}

private void stateChanged( HighAvailabilityMemberChangeEvent event )
{
if ( event.getNewState() == event.getOldState() )
{
return;
}
@Override
public void switchToSlave()
{
shutdownCurrent();
delegate.setDelegate( current = life.add( getSlaveImpl() ) );
life.start();
}

switch ( event.getNewState() )
{
case TO_MASTER:
shutdownCurrent();
delegate.setDelegate( current = life.add( getMasterImpl() ) );
life.start();
break;
case TO_SLAVE:
shutdownCurrent();
delegate.setDelegate( current = life.add( getSlaveImpl( event.getServerHaUri() ) ) );
life.start();
break;
case PENDING:
shutdownCurrent();
break;
}
}
@Override
public void switchToPending()
{
shutdownCurrent();
}

private void shutdownCurrent()
private void shutdownCurrent()
{
if ( current != null )
{
if ( current != null )
{
life.shutdown();
life = new LifeSupport();
}
life.shutdown();
life = new LifeSupport();
}
}

protected abstract T getSlaveImpl();
protected abstract T getMasterImpl();
}

0 comments on commit b4ee518

Please sign in to comment.