Skip to content

Commit

Permalink
Simplified lifecycle handling in ClusterClientModule
Browse files Browse the repository at this point in the history
ClusterClientModule no longer maintains it's own internal lifecycle by instead
entirely relies on the lifecycle passed from the outside.
  • Loading branch information
lutovich committed Oct 28, 2015
1 parent b1aba21 commit aac3b41
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 49 deletions.
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.cluster.client;

import org.jboss.netty.logging.InternalLoggerFactory;

import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand All @@ -27,8 +29,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.jboss.netty.logging.InternalLoggerFactory;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.ExecutorLifecycleAdapter;
import org.neo4j.cluster.MultiPaxosServerFactory;
Expand All @@ -41,6 +41,7 @@
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AcceptorInstanceStore;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AtomicBroadcastMessage;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InMemoryAcceptorInstanceStore;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.LearnerMessage;
Expand All @@ -53,6 +54,7 @@
import org.neo4j.cluster.statemachine.StateTransitionLogger;
import org.neo4j.cluster.timeout.FixedTimeoutStrategy;
import org.neo4j.cluster.timeout.MessageTimeoutStrategy;
import org.neo4j.cluster.timeout.TimeoutStrategy;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.Factory;
import org.neo4j.helpers.HostnamePort;
Expand Down Expand Up @@ -80,27 +82,17 @@ public class ClusterClientModule
public static final Setting<Long> clusterJoinTimeout = Settings.setting( "ha.cluster_join_timeout",
Settings.DURATION, "0s" );

public final LifeSupport life;
public final ClusterClient clusterClient;
public final ProtocolServer server;
public final InMemoryAcceptorInstanceStore acceptorInstanceStore;
public final MessageTimeoutStrategy timeoutStrategy;
public final NetworkReceiver receiver;
private final ProtocolServer server;

public ClusterClientModule( LifeSupport parentLife, Dependencies dependencies, final Monitors monitors, final
Config config, LogService logService, ElectionCredentialsProvider electionCredentialsProvider )
public ClusterClientModule( LifeSupport life, Dependencies dependencies, final Monitors monitors,
final Config config, LogService logService, ElectionCredentialsProvider electionCredentialsProvider )
{
this.life = new LifeSupport();
if (parentLife != null)
{
parentLife.add( this.life );
}

final LogProvider internalLogProvider = logService.getInternalLogProvider();
LogProvider internalLogProvider = logService.getInternalLogProvider();

InternalLoggerFactory.setDefaultFactory( new NettyLoggerFactory( internalLogProvider ) );

timeoutStrategy = new MessageTimeoutStrategy(
TimeoutStrategy timeoutStrategy = new MessageTimeoutStrategy(
new FixedTimeoutStrategy( config.get( ClusterSettings.default_timeout ) ) )
.timeout( HeartbeatMessage.sendHeartbeat, config.get( ClusterSettings.heartbeat_interval ) )
.timeout( HeartbeatMessage.timed_out, config.get( ClusterSettings.heartbeat_timeout ) )
Expand All @@ -116,7 +108,7 @@ public ClusterClientModule( LifeSupport parentLife, Dependencies dependencies, f
MultiPaxosServerFactory protocolServerFactory = new MultiPaxosServerFactory( new ClusterConfiguration( config.get( ClusterSettings.cluster_name ),
internalLogProvider ), logService, monitors.newMonitor( StateMachines.Monitor.class ));

receiver = dependencies.satisfyDependency( new NetworkReceiver( monitors.newMonitor( NetworkReceiver.Monitor.class ),
NetworkReceiver receiver = dependencies.satisfyDependency( new NetworkReceiver( monitors.newMonitor( NetworkReceiver.Monitor.class ),
new NetworkReceiver.Configuration()
{
@Override
Expand Down Expand Up @@ -197,23 +189,21 @@ public ExecutorService newInstance()
}
} );



acceptorInstanceStore = new InMemoryAcceptorInstanceStore();
AcceptorInstanceStore acceptorInstanceStore = new InMemoryAcceptorInstanceStore();

server = protocolServerFactory.newProtocolServer( config.get( ClusterSettings.server_id ), timeoutStrategy,
receiver, sender,
acceptorInstanceStore, electionCredentialsProvider, stateMachineExecutor, objectInputStreamFactory,
objectOutputStreamFactory );

this.life.add( sender );
this.life.add( stateMachineExecutor );
this.life.add( receiver );
life.add( sender );
life.add( stateMachineExecutor );
life.add( receiver );

// Timeout timer - triggers every 10 ms
this.life.add( new TimeoutTrigger(server, monitors) );
life.add( new TimeoutTrigger(server, monitors) );

this.life.add( new ClusterJoin( new ClusterJoin.Configuration()
life.add( new ClusterJoin( new ClusterJoin.Configuration()
{
@Override
public List<HostnamePort> getInitialHosts()
Expand Down Expand Up @@ -242,7 +232,7 @@ public long getClusterJoinTimeout()



clusterClient = dependencies.satisfyDependency(new ClusterClient( this.life, server ));
clusterClient = dependencies.satisfyDependency(new ClusterClient( life, server ));
}

private static class TimeoutTrigger implements Lifecycle
Expand Down
Expand Up @@ -168,6 +168,9 @@ public class HighlyAvailableEditionModule
public HighlyAvailableEditionModule( final PlatformModule platformModule )
{
final LifeSupport life = platformModule.life;
final LifeSupport paxosLife = new LifeSupport();
final LifeSupport clusteringLife = new LifeSupport();

final FileSystemAbstraction fs = platformModule.fileSystem;
final File storeDir = platformModule.storeDir;
final Config config = platformModule.config;
Expand Down Expand Up @@ -240,12 +243,12 @@ public HighAvailabilityMemberState getHighAvailabilityMemberState()
ObjectStreamFactory objectStreamFactory = new ObjectStreamFactory();


ClusterClientModule clusterClientModule = new ClusterClientModule(life, dependencies, monitors, config, logging, electionCredentialsProvider );
final ClusterClient clusterClient = clusterClientModule.clusterClient;
ClusterClientModule clusterClientModule = new ClusterClientModule( clusteringLife, dependencies, monitors,
config, logging, electionCredentialsProvider );
ClusterClient clusterClient = clusterClientModule.clusterClient;
PaxosClusterMemberEvents localClusterEvents = new PaxosClusterMemberEvents( clusterClient, clusterClient,
clusterClient, clusterClient, logging.getInternalLogProvider(),
new org.neo4j.function.Predicate<PaxosClusterMemberEvents
.ClusterMembersSnapshot>()
new org.neo4j.function.Predicate<PaxosClusterMemberEvents.ClusterMembersSnapshot>()
{
@Override
public boolean test( PaxosClusterMemberEvents.ClusterMembersSnapshot item )
Expand Down Expand Up @@ -318,8 +321,6 @@ public void elected( String role, InstanceId instanceId, URI electedMember )

members = dependencies.satisfyDependency( new ClusterMembers( observedMembers, stateMachine ) );

LifeSupport paxosLife = new LifeSupport();

memberStateMachine = paxosLife.add( stateMachine );
electionProviderRef.set( memberStateMachine );

Expand Down Expand Up @@ -482,7 +483,6 @@ public MasterServer apply( final Master master, ConversationManager conversation

life.add( requestContextFactory );
life.add( responseUnpacker );
life.add( paxosLife );

platformModule.diagnosticsManager.appendProvider( new HighAvailabilityDiagnostics( memberStateMachine,
clusterClient ) );
Expand Down Expand Up @@ -535,6 +535,10 @@ public void assertSchemaWritesAllowed() throws InvalidTransactionTypeKernelExcep
registerRecovery( config.get( GraphDatabaseFacadeFactory.Configuration.editionName ), dependencies, logging );

publishEditionInfo( config, dependencies.resolveDependency( UsageData.class ) );

// Ordering of lifecycles is important. Clustering infrastructure should start before paxos components
life.add( clusteringLife );
life.add( paxosLife );
}

private void publishEditionInfo( Config config, UsageData sysInfo )
Expand Down
Expand Up @@ -1036,7 +1036,7 @@ public void removeClusterMemberListener( ClusterMemberListener listener )
}
}, clusterClientModule.clusterClient.getServerId() ) );

life.add( new FutureLifecycleAdapter<>( clusterClientModule.life ) );
life.add( new FutureLifecycleAdapter<>( clusterClientLife ) );
}
}

Expand Down
Expand Up @@ -220,8 +220,9 @@ public void failedInstanceShouldReceiveCorrectCoordinatorIdUponRejoiningCluster(
createNodeOn( cluster.getMaster() );
cluster.sync();

ClusterClientModule clusterClient = newClusterClient( new InstanceId( 1 ) );
cleanup.add(clusterClient.life);
LifeSupport life = new LifeSupport();
ClusterClientModule clusterClient = newClusterClient( life, new InstanceId( 1 ) );
cleanup.add( life );

final AtomicReference<InstanceId> coordinatorIdWhenReJoined = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch( 1 );
Expand All @@ -235,7 +236,7 @@ public void enteredCluster( ClusterConfiguration clusterConfiguration )
}
} );

clusterClient.life.start();
life.start();

// Then
assertTrue( latch.await( 20, SECONDS ) );
Expand Down Expand Up @@ -263,7 +264,7 @@ private static Node createNodeOn( HighlyAvailableGraphDatabase db )
}
}

private ClusterClientModule newClusterClient( InstanceId id )
private ClusterClientModule newClusterClient( LifeSupport life, InstanceId id )
{
Map<String,String> configMap = MapUtil.stringMap(
ClusterSettings.initial_hosts.name(), cluster.getInitialHostsConfigString(),
Expand All @@ -273,11 +274,11 @@ private ClusterClientModule newClusterClient( InstanceId id )
Config config = new Config( configMap, GraphDatabaseFacadeFactory.Configuration.class,
GraphDatabaseSettings.class );

LifeSupport life = new LifeSupport();
SimpleLogService logService = new SimpleLogService( FormattedLogProvider.toOutputStream( System.out ), FormattedLogProvider.toOutputStream( System.out ) );
FormattedLogProvider logProvider = FormattedLogProvider.toOutputStream( System.out );
SimpleLogService logService = new SimpleLogService( logProvider, logProvider );

return new ClusterClientModule(life, new Dependencies( ), new Monitors(), config,
logService, new NotElectableElectionCredentialsProvider());
return new ClusterClientModule( life, new Dependencies(), new Monitors(), config, logService,
new NotElectableElectionCredentialsProvider() );
}

private static void attemptTransactions( HighlyAvailableGraphDatabase... dbs )
Expand Down
Expand Up @@ -113,7 +113,10 @@ public static void main( String[] args ) throws IOException
LifeSupport life = new LifeSupport();
life.add(jobScheduler);
Dependencies dependencies = new Dependencies();
ClusterClientModule clusterClientModule = new ClusterClientModule( life, dependencies, new Monitors(), new Config( config ), logService, new NotElectableElectionCredentialsProvider() );

// start network communication
new ClusterClientModule( life, dependencies, new Monitors(), new Config( config ), logService,
new NotElectableElectionCredentialsProvider() );

new StandaloneClusterClient( life );
}
Expand Down
Expand Up @@ -188,11 +188,13 @@ public void before() throws Exception
config.put( server_id.name(), "" + i );
config.put( initial_hosts.name(), ":5001" );

ClusterClientModule clusterClientModule = new ClusterClientModule(null, new Dependencies(), new Monitors(),
new Config(config), NullLogService.getInstance(), new ServerIdElectionCredentialsProvider());
LifeSupport moduleLife = new LifeSupport();
ClusterClientModule clusterClientModule = new ClusterClientModule( moduleLife, new Dependencies(),
new Monitors(), new Config( config ), NullLogService.getInstance(),
new ServerIdElectionCredentialsProvider() );

final ClusterClient client = clusterClientModule.clusterClient;
final CountDownLatch latch = new CountDownLatch( 1 );
ClusterClient client = clusterClientModule.clusterClient;
CountDownLatch latch = new CountDownLatch( 1 );
client.addClusterListener( new ClusterListener.Adapter()
{
@Override
Expand All @@ -202,7 +204,7 @@ public void enteredCluster( ClusterConfiguration configuration )
client.removeClusterListener( this );
}
} );
life.add(clusterClientModule.life);
life.add( moduleLife );
clients[i - 1] = client;
assertTrue( "Didn't join the cluster", latch.await( 20, SECONDS ) );
}
Expand Down

0 comments on commit aac3b41

Please sign in to comment.