From 58f9e534402ab8a181d28033592c68320566e613 Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Fri, 19 Aug 2016 16:32:02 +0200 Subject: [PATCH] Notify the listener immediately when added to SharedDiscoveryCoreClient --- .../consensus/membership/RaftMembershipManager.java | 12 +++++------- .../discovery/RaftDiscoveryServiceConnector.java | 4 +--- .../java/org/neo4j/coreedge/discovery/Cluster.java | 6 ++---- .../neo4j/coreedge/discovery/CoreClusterMember.java | 2 -- .../discovery/SharedDiscoveryCoreClient.java | 8 +++++--- .../java/org/neo4j/test/coreedge/ClusterRule.java | 6 +++--- 6 files changed, 16 insertions(+), 22 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java index 0af85e9f92ec..3d300562a28c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/membership/RaftMembershipManager.java @@ -72,20 +72,18 @@ public class RaftMembershipManager extends LifecycleAdapter implements RaftMembe private Set additionalReplicationMembers = new HashSet<>(); public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder memberSetBuilder, - ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, - long electionTimeout, Clock clock, long catchupTimeout, - StateStorage membershipStorage ) + ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, long electionTimeout, + Clock clock, long catchupTimeout, StateStorage membershipStorage ) { this.sendToMyself = sendToMyself; this.memberSetBuilder = memberSetBuilder; this.raftLog = raftLog; this.expectedClusterSize = expectedClusterSize; this.storage = membershipStorage; - this.log = logProvider.getLog( getClass() ); - this.membershipChanger = new RaftMembershipChanger( raftLog, clock, - electionTimeout, logProvider, catchupTimeout, this ); - } + this.membershipChanger = + new RaftMembershipChanger( raftLog, clock, electionTimeout, logProvider, catchupTimeout, this ); + } public void setRecoverFromIndexSupplier( LongSupplier recoverFromIndexSupplier ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java index 8ab507650148..b8ec526543d0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/RaftDiscoveryServiceConnector.java @@ -41,8 +41,6 @@ public RaftDiscoveryServiceConnector( CoreTopologyService discoveryService, Raft @Override public void start() throws BootstrapException { - discoveryService.addCoreTopologyListener( this ); - ClusterTopology clusterTopology = discoveryService.currentTopology(); Set initialMembers = clusterTopology.coreMembers(); @@ -51,7 +49,7 @@ public void start() throws BootstrapException raftMachine.bootstrapWithInitialMembers( new MemberIdSet( initialMembers ) ); } - onCoreTopologyChange(); + discoveryService.addCoreTopologyListener( this ); } @Override diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java index d3f83c49ceb7..a693f1d04edd 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java @@ -290,10 +290,8 @@ public CoreClusterMember coreTx( BiConsumer op ) return leaderTx( op ); } - private CoreClusterMember addCoreMemberWithId( int memberId, int intendedClusterSize, Map - extraParams, - Map> instanceExtraParams, String - recordFormat ) + private CoreClusterMember addCoreMemberWithId( int memberId, int intendedClusterSize, + Map extraParams, Map> instanceExtraParams, String recordFormat ) { Config config = firstOrNull( coreMembers.values() ).database().getDependencyResolver().resolveDependency( Config.class ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java index c829c6f2efbe..6a49e35fe34e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/CoreClusterMember.java @@ -53,8 +53,6 @@ public class CoreClusterMember private final int serverId; private CoreGraphDatabase database; - static final String CLUSTER_NAME = "core-neo4j"; - public CoreClusterMember( int serverId, int clusterSize, List addresses, DiscoveryServiceFactory discoveryServiceFactory, diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java index a3173bd09a32..64462752e68c 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryCoreClient.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.discovery; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.Set; @@ -37,7 +38,7 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology private final SharedDiscoveryService sharedDiscoveryService; private final MemberId member; private final CoreAddresses coreAddresses; - private final Set listeners = new LinkedHashSet<>(); + private final Set listeners = Collections.synchronizedSet( new LinkedHashSet<>() ); private final Log log; SharedDiscoveryCoreClient( SharedDiscoveryService sharedDiscoveryService, MemberId member, LogProvider logProvider, Config config ) @@ -49,9 +50,10 @@ class SharedDiscoveryCoreClient extends LifecycleAdapter implements CoreTopology } @Override - public synchronized void addCoreTopologyListener( Listener listener ) + public void addCoreTopologyListener( Listener listener ) { listeners.add( listener ); + listener.onCoreTopologyChange(); } @Override @@ -84,7 +86,7 @@ public ClusterTopology currentTopology() return topology; } - synchronized void onTopologyChange() + void onTopologyChange() { log.info( "Notified of topology change" ); listeners.forEach( Listener::onCoreTopologyChange ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java b/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java index 3fabf234288b..2c0a26b2c8af 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/test/coreedge/ClusterRule.java @@ -45,7 +45,7 @@ public class ClusterRule extends ExternalResource private int noCoreMembers = 3; private int noEdgeMembers = 3; - private DiscoveryServiceFactory disoveryServiceFactory = new SharedDiscoveryService(); + private DiscoveryServiceFactory discoveryServiceFactory = new SharedDiscoveryService(); private Map coreParams = stringMap(); private Map> instanceCoreParams = new HashMap<>(); private Map edgeParams = stringMap(); @@ -111,7 +111,7 @@ public Cluster createCluster() throws Exception { if ( cluster == null ) { - cluster = new Cluster( clusterDirectory, noCoreMembers, noEdgeMembers, disoveryServiceFactory, coreParams, + cluster = new Cluster( clusterDirectory, noCoreMembers, noEdgeMembers, discoveryServiceFactory, coreParams, instanceCoreParams, edgeParams, instanceEdgeParams, recordFormat ); } @@ -143,7 +143,7 @@ public ClusterRule withNumberOfEdgeMembers( int noEdgeMembers ) public ClusterRule withDiscoveryServiceFactory( DiscoveryServiceFactory factory ) { - this.disoveryServiceFactory = factory; + this.discoveryServiceFactory = factory; return this; }