Skip to content

Commit

Permalink
Fixed lifecycle issue with TxCommitProcess
Browse files Browse the repository at this point in the history
Issue resulted in a master instance being healthy but unable to commit
any transactions because it's TransactionCommitProcess was not properly
switched on.

Commit process is switched on from a CommitProcessSwitcher which
registers itself as a listener in HighAvailabilityModeSwitcher. Latter
class notifies all registered listeners during slave and master
switches.

Whole clustering infrastructure started up and begin communication with
other machines concurrently with listener registration. This resulted in
a race between listener registration and notification.

This commit puts all ModeSwitcher listeners in a separate LifeSupport
instance which is started before clustering and paxos. Such startup
sequence ensures that all listeners are registered before clustering
communication starts.

Co-authored-by: @MishaDemianenko
  • Loading branch information
lutovich committed Oct 26, 2015
1 parent 611de93 commit d611df5
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 93 deletions.
Expand Up @@ -19,6 +19,8 @@
*/ */
package org.neo4j.cluster.client; package org.neo4j.cluster.client;


import org.jboss.netty.logging.InternalLoggerFactory;

import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand All @@ -27,8 +29,6 @@
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.jboss.netty.logging.InternalLoggerFactory;

import org.neo4j.cluster.ClusterSettings; import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.ExecutorLifecycleAdapter; import org.neo4j.cluster.ExecutorLifecycleAdapter;
import org.neo4j.cluster.MultiPaxosServerFactory; import org.neo4j.cluster.MultiPaxosServerFactory;
Expand All @@ -41,6 +41,7 @@
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AcceptorInstanceStore;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AtomicBroadcastMessage; import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AtomicBroadcastMessage;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InMemoryAcceptorInstanceStore; import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InMemoryAcceptorInstanceStore;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.LearnerMessage; import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.LearnerMessage;
Expand All @@ -53,6 +54,7 @@
import org.neo4j.cluster.statemachine.StateTransitionLogger; import org.neo4j.cluster.statemachine.StateTransitionLogger;
import org.neo4j.cluster.timeout.FixedTimeoutStrategy; import org.neo4j.cluster.timeout.FixedTimeoutStrategy;
import org.neo4j.cluster.timeout.MessageTimeoutStrategy; import org.neo4j.cluster.timeout.MessageTimeoutStrategy;
import org.neo4j.cluster.timeout.TimeoutStrategy;
import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.Factory; import org.neo4j.helpers.Factory;
import org.neo4j.helpers.HostnamePort; import org.neo4j.helpers.HostnamePort;
Expand Down Expand Up @@ -80,27 +82,17 @@ public class ClusterClientModule
public static final Setting<Long> clusterJoinTimeout = Settings.setting( "ha.cluster_join_timeout", public static final Setting<Long> clusterJoinTimeout = Settings.setting( "ha.cluster_join_timeout",
Settings.DURATION, "0s" ); Settings.DURATION, "0s" );


public final LifeSupport life;
public final ClusterClient clusterClient; public final ClusterClient clusterClient;
public final ProtocolServer server; private final ProtocolServer server;
public final InMemoryAcceptorInstanceStore acceptorInstanceStore;
public final MessageTimeoutStrategy timeoutStrategy;
public final NetworkReceiver receiver;


public ClusterClientModule( LifeSupport parentLife, Dependencies dependencies, final Monitors monitors, final public ClusterClientModule( LifeSupport life, Dependencies dependencies, final Monitors monitors,
Config config, LogService logService, ElectionCredentialsProvider electionCredentialsProvider ) final Config config, LogService logService, ElectionCredentialsProvider electionCredentialsProvider )
{ {
this.life = new LifeSupport();
if (parentLife != null)
{
parentLife.add( this.life );
}

final LogProvider internalLogProvider = logService.getInternalLogProvider(); final LogProvider internalLogProvider = logService.getInternalLogProvider();


InternalLoggerFactory.setDefaultFactory( new NettyLoggerFactory( internalLogProvider ) ); InternalLoggerFactory.setDefaultFactory( new NettyLoggerFactory( internalLogProvider ) );


timeoutStrategy = new MessageTimeoutStrategy( TimeoutStrategy timeoutStrategy = new MessageTimeoutStrategy(
new FixedTimeoutStrategy( config.get( ClusterSettings.default_timeout ) ) ) new FixedTimeoutStrategy( config.get( ClusterSettings.default_timeout ) ) )
.timeout( HeartbeatMessage.sendHeartbeat, config.get( ClusterSettings.heartbeat_interval ) ) .timeout( HeartbeatMessage.sendHeartbeat, config.get( ClusterSettings.heartbeat_interval ) )
.timeout( HeartbeatMessage.timed_out, config.get( ClusterSettings.heartbeat_timeout ) ) .timeout( HeartbeatMessage.timed_out, config.get( ClusterSettings.heartbeat_timeout ) )
Expand All @@ -116,7 +108,7 @@ public ClusterClientModule( LifeSupport parentLife, Dependencies dependencies, f
MultiPaxosServerFactory protocolServerFactory = new MultiPaxosServerFactory( new ClusterConfiguration( config.get( ClusterSettings.cluster_name ), MultiPaxosServerFactory protocolServerFactory = new MultiPaxosServerFactory( new ClusterConfiguration( config.get( ClusterSettings.cluster_name ),
internalLogProvider ), logService, monitors.newMonitor( StateMachines.Monitor.class )); internalLogProvider ), logService, monitors.newMonitor( StateMachines.Monitor.class ));


receiver = dependencies.satisfyDependency( new NetworkReceiver( monitors.newMonitor( NetworkReceiver.Monitor.class ), NetworkReceiver receiver = dependencies.satisfyDependency( new NetworkReceiver( monitors.newMonitor( NetworkReceiver.Monitor.class ),
new NetworkReceiver.Configuration() new NetworkReceiver.Configuration()
{ {
@Override @Override
Expand Down Expand Up @@ -199,21 +191,21 @@ public ExecutorService newInstance()






acceptorInstanceStore = new InMemoryAcceptorInstanceStore(); AcceptorInstanceStore acceptorInstanceStore = new InMemoryAcceptorInstanceStore();


server = protocolServerFactory.newProtocolServer( config.get( ClusterSettings.server_id ), timeoutStrategy, server = protocolServerFactory.newProtocolServer( config.get( ClusterSettings.server_id ), timeoutStrategy,
receiver, sender, receiver, sender,
acceptorInstanceStore, electionCredentialsProvider, stateMachineExecutor, objectInputStreamFactory, acceptorInstanceStore, electionCredentialsProvider, stateMachineExecutor, objectInputStreamFactory,
objectOutputStreamFactory ); objectOutputStreamFactory );


this.life.add( sender ); life.add( sender );
this.life.add( stateMachineExecutor ); life.add( stateMachineExecutor );
this.life.add( receiver ); life.add( receiver );


// Timeout timer - triggers every 10 ms // Timeout timer - triggers every 10 ms
this.life.add( new TimeoutTrigger(server, monitors) ); life.add( new TimeoutTrigger(server, monitors) );


this.life.add( new ClusterJoin( new ClusterJoin.Configuration() life.add( new ClusterJoin( new ClusterJoin.Configuration()
{ {
@Override @Override
public List<HostnamePort> getInitialHosts() public List<HostnamePort> getInitialHosts()
Expand All @@ -240,9 +232,7 @@ public long getClusterJoinTimeout()
} }
}, server, logService ) ); }, server, logService ) );



clusterClient = dependencies.satisfyDependency(new ClusterClient( life, server ));

clusterClient = dependencies.satisfyDependency(new ClusterClient( this.life, server ));
} }


private static class TimeoutTrigger implements Lifecycle private static class TimeoutTrigger implements Lifecycle
Expand Down

0 comments on commit d611df5

Please sign in to comment.