diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java index 266b8b4cb7777..3fc9ca4e838bf 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java @@ -23,6 +23,7 @@ import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.discovery.DiscoveredMemberRepository; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; @@ -52,7 +53,11 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member SimpleStorage clusterIdStorage = new SimpleStorage<>( fileSystem, clusterStateDirectory, CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider ); - topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider ); + DiscoveredMemberRepository discoveredMemberRepository = + new DiscoveredMemberRepository( clusterStateDirectory, fileSystem, logProvider ); + + topologyService = discoveryServiceFactory.coreTopologyService( config, myself, discoveredMemberRepository, + logProvider ); BindingService bindingService = new BindingService( clusterIdStorage, topologyService, logProvider, Clocks.systemClock(), () -> sleep( 100 ), 60000 ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveredMemberRepository.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveredMemberRepository.java new file mode 100644 index 0000000000000..5ed4660a8c73d --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveredMemberRepository.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Set; + +import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toSet; + +public class DiscoveredMemberRepository +{ + private final String filename = "DiscoveredMemberAddresses.txt"; + private final FileSystemAbstraction fileSystem; + private final File file; + private final Log log; + + public DiscoveredMemberRepository( File directory, FileSystemAbstraction fileSystem, LogProvider logProvider ) + { + this.file = new File( directory, filename ); + this.fileSystem = fileSystem; + this.log = logProvider.getLog( getClass() ); + } + + public synchronized Set previouslyDiscoveredMembers() + { + if ( fileSystem.fileExists( file ) ) + { + try ( BufferedReader reader = new BufferedReader( fileSystem.openAsReader( file, UTF_8 ) ) ) + { + return reader.lines().map( AdvertisedSocketAddress::new ).collect( toSet() ); + } + catch ( IOException e ) + { + log.warn( String.format( "Failed to read previously discovered members from %s ", + file.getAbsolutePath() ), e ); + } + } + return emptySet(); + } + + public synchronized void store( Set discoveredMembers ) + { + try ( PrintWriter writer = new PrintWriter( fileSystem.openAsWriter( file, UTF_8, false ) ) ) + { + for ( AdvertisedSocketAddress member : discoveredMembers ) + { + writer.println( member.toString() ); + } + } + catch ( IOException e ) + { + log.warn( String.format( "Failed to store discovered members to %s ", + file.getAbsolutePath() ), e ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java index b98b008631f41..a8162c97ed4b8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/DiscoveryServiceFactory.java @@ -27,7 +27,8 @@ public interface DiscoveryServiceFactory { - CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider ); + CoreTopologyService coreTopologyService( Config config, MemberId myself, + DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider ); TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java index 84ea1ac03eb7c..b85eb5979ba87 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastCoreTopologyService.java @@ -31,11 +31,13 @@ import com.hazelcast.instance.GroupProperties; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; -import org.neo4j.coreedge.identity.ClusterId; -import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; import org.neo4j.coreedge.messaging.address.ListenSocketAddress; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -46,16 +48,19 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol { private final Config config; private final MemberId myself; + private final DiscoveredMemberRepository discoveredMemberRepository; private final Log log; private final CoreTopologyListenerService listenerService; private String membershipRegistrationId; private HazelcastInstance hazelcastInstance; - HazelcastCoreTopologyService( Config config, MemberId myself, LogProvider logProvider ) + HazelcastCoreTopologyService( Config config, MemberId myself, DiscoveredMemberRepository discoveredMemberRepository, + LogProvider logProvider ) { this.config = config; this.myself = myself; + this.discoveredMemberRepository = discoveredMemberRepository; this.listenerService = new CoreTopologyListenerService(); this.log = logProvider.getLog( getClass() ); } @@ -78,6 +83,16 @@ public void memberAdded( MembershipEvent membershipEvent ) { log.info( "Core member added %s", membershipEvent ); log.info( "Current topology is %s", currentTopology() ); + notifyMembershipChange(); + } + + private void notifyMembershipChange() + { + Set members = hazelcastInstance.getCluster().getMembers().stream() + .map( member -> new AdvertisedSocketAddress( + String.format( "%s:%d", member.getSocketAddress().getHostName(), + member.getSocketAddress().getPort() ) ) ).collect( Collectors.toSet() ); + discoveredMemberRepository.store( members ); listenerService.notifyListeners(); } @@ -86,7 +101,7 @@ public void memberRemoved( MembershipEvent membershipEvent ) { log.info( "Core member removed %s", membershipEvent ); log.info( "Current topology is %s", currentTopology() ); - listenerService.notifyListeners(); + notifyMembershipChange(); } @Override @@ -100,7 +115,7 @@ public void start() hazelcastInstance = createHazelcastInstance(); log.info( "Cluster discovery service started" ); membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this ); - listenerService.notifyListeners(); + notifyMembershipChange(); } @Override @@ -132,7 +147,13 @@ private HazelcastInstance createHazelcastInstance() { tcpIpConfig.addMember( address.toString() ); } - log.info( "Discovering cluster with initial members: " + initialMembers ); + Set previouslySeenMembers = discoveredMemberRepository.previouslyDiscoveredMembers(); + for ( AdvertisedSocketAddress seenAddress : previouslySeenMembers ) + { + tcpIpConfig.addMember( seenAddress.toString() ); + } + log.info( String.format( "Discovering cluster with initial members: %s and previously seen members: %s", + initialMembers, previouslySeenMembers ) ); NetworkConfig networkConfig = new NetworkConfig(); ListenSocketAddress hazelcastAddress = config.get( CoreEdgeClusterSettings.discovery_listen_address ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java index bd17ca4080a93..b5eaf16e9fe4c 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/discovery/HazelcastDiscoveryServiceFactory.java @@ -29,11 +29,12 @@ public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory { @Override - public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider ) + public CoreTopologyService coreTopologyService( Config config, MemberId myself, + DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider ) { makeHazelcastSilent( config ); hazelcastShouldNotPhoneHome(); - return new HazelcastCoreTopologyService( config, myself, logProvider ); + return new HazelcastCoreTopologyService( config, myself, discoveredMemberRepository, logProvider ); } @Override diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/DiscoveredMemberRepositoryTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/DiscoveredMemberRepositoryTest.java new file mode 100644 index 0000000000000..aa04c7d619c1e --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/DiscoveredMemberRepositoryTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.discovery; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.Set; + +import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.logging.NullLogProvider; +import org.neo4j.test.rule.TestDirectory; + +import static org.junit.Assert.assertEquals; +import static org.neo4j.helpers.collection.Iterators.set; + +public class DiscoveredMemberRepositoryTest +{ + @Rule + public final TestDirectory testDirectory = TestDirectory.testDirectory(); + + private final FileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction(); + + @Test + public void shouldStoreDiscoveredMembers() throws Exception + { + // given + DiscoveredMemberRepository discoveredMemberRepositoryA = + new DiscoveredMemberRepository( testDirectory.directory(), fileSystem, NullLogProvider.getInstance() ); + + Set members = + set( new AdvertisedSocketAddress( "localhost:5003" ), new AdvertisedSocketAddress( "localhost:5004" ), + new AdvertisedSocketAddress( "localhost:5005" ) ); + + discoveredMemberRepositoryA.store( members ); + + // when + DiscoveredMemberRepository discoveredMemberRepositoryB = + new DiscoveredMemberRepository( testDirectory.directory(), fileSystem, NullLogProvider.getInstance() ); + + // then + assertEquals(members, discoveredMemberRepositoryB.previouslyDiscoveredMembers() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java index 1e8b879335c2c..de77cc8716fd9 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/SharedDiscoveryService.java @@ -52,7 +52,8 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory private AtomicReference clusterId = new AtomicReference<>(); @Override - public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider ) + public CoreTopologyService coreTopologyService( Config config, MemberId myself, + DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider ) { return new SharedDiscoveryCoreClient( this, myself, logProvider, config ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java index 1b5cfb56e76e7..2fff6b7434752 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterMembershipChangeIT.java @@ -19,8 +19,8 @@ */ package org.neo4j.coreedge.scenarios; -import org.junit.Ignore; import org.junit.Rule; +import org.junit.Test; import java.util.List; @@ -49,7 +49,7 @@ public class ClusterMembershipChangeIT public final ClusterRule clusterRule = new ClusterRule( getClass() ).withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() ) .withNumberOfCoreMembers( 3 ); - @Ignore( "Incomplete, HC will hang waiting for others to join." ) + @Test public void newMemberNotInInitialMembersConfig() throws Throwable { // when