Skip to content

Commit

Permalink
Notify the listener immediately when added to SharedDiscoveryCoreClient
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Aug 22, 2016
1 parent 2af5eb3 commit 58f9e53
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 22 deletions.
Expand Up @@ -72,20 +72,18 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe
private Set<MemberId> additionalReplicationMembers = new HashSet<>(); private Set<MemberId> additionalReplicationMembers = new HashSet<>();


public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder<MemberId> memberSetBuilder, public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder<MemberId> memberSetBuilder,
ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, long electionTimeout,
long electionTimeout, Clock clock, long catchupTimeout, Clock clock, long catchupTimeout, StateStorage<RaftMembershipState> membershipStorage )
StateStorage<RaftMembershipState> membershipStorage )
{ {
this.sendToMyself = sendToMyself; this.sendToMyself = sendToMyself;
this.memberSetBuilder = memberSetBuilder; this.memberSetBuilder = memberSetBuilder;
this.raftLog = raftLog; this.raftLog = raftLog;
this.expectedClusterSize = expectedClusterSize; this.expectedClusterSize = expectedClusterSize;
this.storage = membershipStorage; this.storage = membershipStorage;

this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.membershipChanger = new RaftMembershipChanger( raftLog, clock, this.membershipChanger =
electionTimeout, logProvider, catchupTimeout, this ); new RaftMembershipChanger( raftLog, clock, electionTimeout, logProvider, catchupTimeout, this );
} }


public void setRecoverFromIndexSupplier( LongSupplier recoverFromIndexSupplier ) public void setRecoverFromIndexSupplier( LongSupplier recoverFromIndexSupplier )
{ {
Expand Down
Expand Up @@ -41,8 +41,6 @@ public RaftDiscoveryServiceConnector( CoreTopologyService discoveryService, Raft
@Override @Override
public void start() throws BootstrapException public void start() throws BootstrapException
{ {
discoveryService.addCoreTopologyListener( this );

ClusterTopology clusterTopology = discoveryService.currentTopology(); ClusterTopology clusterTopology = discoveryService.currentTopology();
Set<MemberId> initialMembers = clusterTopology.coreMembers(); Set<MemberId> initialMembers = clusterTopology.coreMembers();


Expand All @@ -51,7 +49,7 @@ public void start() throws BootstrapException
raftMachine.bootstrapWithInitialMembers( new MemberIdSet( initialMembers ) ); raftMachine.bootstrapWithInitialMembers( new MemberIdSet( initialMembers ) );
} }


onCoreTopologyChange(); discoveryService.addCoreTopologyListener( this );
} }


@Override @Override
Expand Down
Expand Up @@ -290,10 +290,8 @@ public CoreClusterMember coreTx( BiConsumer<CoreGraphDatabase, Transaction> op )
return leaderTx( op ); return leaderTx( op );
} }


private CoreClusterMember addCoreMemberWithId( int memberId, int intendedClusterSize, Map<String, String> private CoreClusterMember addCoreMemberWithId( int memberId, int intendedClusterSize,
extraParams, Map<String,String> extraParams, Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
Map<String, IntFunction<String>> instanceExtraParams, String
recordFormat )
{ {
Config config = firstOrNull( coreMembers.values() ).database().getDependencyResolver().resolveDependency( Config config = firstOrNull( coreMembers.values() ).database().getDependencyResolver().resolveDependency(
Config.class ); Config.class );
Expand Down
Expand Up @@ -53,8 +53,6 @@ public class CoreClusterMember
private final int serverId; private final int serverId;
private CoreGraphDatabase database; private CoreGraphDatabase database;


static final String CLUSTER_NAME = "core-neo4j";

public CoreClusterMember( int serverId, int clusterSize, public CoreClusterMember( int serverId, int clusterSize,
List<AdvertisedSocketAddress> addresses, List<AdvertisedSocketAddress> addresses,
DiscoveryServiceFactory discoveryServiceFactory, DiscoveryServiceFactory discoveryServiceFactory,
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/ */
package org.neo4j.coreedge.discovery; package org.neo4j.coreedge.discovery;


import java.util.Collections;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;


Expand All @@ -37,7 +38,7 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology
private final SharedDiscoveryService sharedDiscoveryService; private final SharedDiscoveryService sharedDiscoveryService;
private final MemberId member; private final MemberId member;
private final CoreAddresses coreAddresses; private final CoreAddresses coreAddresses;
private final Set<Listener> listeners = new LinkedHashSet<>(); private final Set<Listener> listeners = Collections.synchronizedSet( new LinkedHashSet<>() );
private final Log log; private final Log log;


SharedDiscoveryCoreClient( SharedDiscoveryService sharedDiscoveryService, MemberId member, LogProvider logProvider, Config config ) SharedDiscoveryCoreClient( SharedDiscoveryService sharedDiscoveryService, MemberId member, LogProvider logProvider, Config config )
Expand All @@ -49,9 +50,10 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology
} }


@Override @Override
public synchronized void addCoreTopologyListener( Listener listener ) public void addCoreTopologyListener( Listener listener )
{ {
listeners.add( listener ); listeners.add( listener );
listener.onCoreTopologyChange();
} }


@Override @Override
Expand Down Expand Up @@ -84,7 +86,7 @@ public ClusterTopology currentTopology()
return topology; return topology;
} }


synchronized void onTopologyChange() void onTopologyChange()
{ {
log.info( "Notified of topology change" ); log.info( "Notified of topology change" );
listeners.forEach( Listener::onCoreTopologyChange ); listeners.forEach( Listener::onCoreTopologyChange );
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class ClusterRule extends ExternalResource


private int noCoreMembers = 3; private int noCoreMembers = 3;
private int noEdgeMembers = 3; private int noEdgeMembers = 3;
private DiscoveryServiceFactory disoveryServiceFactory = new SharedDiscoveryService(); private DiscoveryServiceFactory discoveryServiceFactory = new SharedDiscoveryService();
private Map<String,String> coreParams = stringMap(); private Map<String,String> coreParams = stringMap();
private Map<String,IntFunction<String>> instanceCoreParams = new HashMap<>(); private Map<String,IntFunction<String>> instanceCoreParams = new HashMap<>();
private Map<String,String> edgeParams = stringMap(); private Map<String,String> edgeParams = stringMap();
Expand Down Expand Up @@ -111,7 +111,7 @@ public Cluster createCluster() throws Exception
{ {
if ( cluster == null ) if ( cluster == null )
{ {
cluster = new Cluster( clusterDirectory, noCoreMembers, noEdgeMembers, disoveryServiceFactory, coreParams, cluster = new Cluster( clusterDirectory, noCoreMembers, noEdgeMembers, discoveryServiceFactory, coreParams,
instanceCoreParams, edgeParams, instanceEdgeParams, recordFormat ); instanceCoreParams, edgeParams, instanceEdgeParams, recordFormat );


} }
Expand Down Expand Up @@ -143,7 +143,7 @@ public ClusterRule withNumberOfEdgeMembers( int noEdgeMembers )


public ClusterRule withDiscoveryServiceFactory( DiscoveryServiceFactory factory ) public ClusterRule withDiscoveryServiceFactory( DiscoveryServiceFactory factory )
{ {
this.disoveryServiceFactory = factory; this.discoveryServiceFactory = factory;
return this; return this;
} }


Expand Down

0 comments on commit 58f9e53

Please sign in to comment.