Skip to content

Commit

Permalink
Save discovered members and use during restart to connect again.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Aug 29, 2016
1 parent e7dfe74 commit 9d3151b
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 13 deletions.
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.coreedge.core.state.storage.SimpleStorage;
import org.neo4j.coreedge.discovery.CoreTopologyService;
import org.neo4j.coreedge.discovery.DiscoveredMemberRepository;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;
Expand Down Expand Up @@ -52,7 +53,11 @@ public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, Member
SimpleStorage<ClusterId> clusterIdStorage = new SimpleStorage<>( fileSystem, clusterStateDirectory,
CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider );

topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider );
DiscoveredMemberRepository discoveredMemberRepository =
new DiscoveredMemberRepository( clusterStateDirectory, fileSystem, logProvider );

topologyService = discoveryServiceFactory.coreTopologyService( config, myself, discoveredMemberRepository,
logProvider );
BindingService bindingService = new BindingService( clusterIdStorage, topologyService, logProvider,
Clocks.systemClock(), () -> sleep( 100 ), 60000 );

Expand Down
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;

import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;

public class DiscoveredMemberRepository
{
private final String filename = "DiscoveredMemberAddresses.txt";
private final FileSystemAbstraction fileSystem;
private final File file;
private final Log log;

public DiscoveredMemberRepository( File directory, FileSystemAbstraction fileSystem, LogProvider logProvider )
{
this.file = new File( directory, filename );
this.fileSystem = fileSystem;
this.log = logProvider.getLog( getClass() );
}

public synchronized Set<AdvertisedSocketAddress> previouslyDiscoveredMembers()
{
if ( fileSystem.fileExists( file ) )
{
try ( BufferedReader reader = new BufferedReader( fileSystem.openAsReader( file, UTF_8 ) ) )
{
return reader.lines().map( AdvertisedSocketAddress::new ).collect( toSet() );
}
catch ( IOException e )
{
log.warn( String.format( "Failed to read previously discovered members from %s ",
file.getAbsolutePath() ), e );
}
}
return emptySet();
}

public synchronized void store( Set<AdvertisedSocketAddress> discoveredMembers )
{
try ( PrintWriter writer = new PrintWriter( fileSystem.openAsWriter( file, UTF_8, false ) ) )
{
for ( AdvertisedSocketAddress member : discoveredMembers )
{
writer.println( member.toString() );
}
}
catch ( IOException e )
{
log.warn( String.format( "Failed to store discovered members to %s ",
file.getAbsolutePath() ), e );
}
}
}
Expand Up @@ -27,7 +27,8 @@

public interface DiscoveryServiceFactory
{
CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider );
CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider );

TopologyService edgeDiscoveryService( Config config, AdvertisedSocketAddress boltAddress, LogProvider logProvider, DelayedRenewableTimeoutService timeoutService, long edgeTimeToLiveTimeout, long edgeRefreshRate );
}
Expand Up @@ -31,11 +31,13 @@
import com.hazelcast.instance.GroupProperties;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.coreedge.messaging.address.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -46,16 +48,19 @@ class HazelcastCoreTopologyService extends LifecycleAdapter implements CoreTopol
{
private final Config config;
private final MemberId myself;
private final DiscoveredMemberRepository discoveredMemberRepository;
private final Log log;
private final CoreTopologyListenerService listenerService;
private String membershipRegistrationId;

private HazelcastInstance hazelcastInstance;

HazelcastCoreTopologyService( Config config, MemberId myself, LogProvider logProvider )
HazelcastCoreTopologyService( Config config, MemberId myself, DiscoveredMemberRepository discoveredMemberRepository,
LogProvider logProvider )
{
this.config = config;
this.myself = myself;
this.discoveredMemberRepository = discoveredMemberRepository;
this.listenerService = new CoreTopologyListenerService();
this.log = logProvider.getLog( getClass() );
}
Expand All @@ -78,6 +83,16 @@ public void memberAdded( MembershipEvent membershipEvent )
{
log.info( "Core member added %s", membershipEvent );
log.info( "Current topology is %s", currentTopology() );
notifyMembershipChange();
}

private void notifyMembershipChange()
{
Set<AdvertisedSocketAddress> members = hazelcastInstance.getCluster().getMembers().stream()
.map( member -> new AdvertisedSocketAddress(
String.format( "%s:%d", member.getSocketAddress().getHostName(),
member.getSocketAddress().getPort() ) ) ).collect( Collectors.toSet() );
discoveredMemberRepository.store( members );
listenerService.notifyListeners();
}

Expand All @@ -86,7 +101,7 @@ public void memberRemoved( MembershipEvent membershipEvent )
{
log.info( "Core member removed %s", membershipEvent );
log.info( "Current topology is %s", currentTopology() );
listenerService.notifyListeners();
notifyMembershipChange();
}

@Override
Expand All @@ -100,7 +115,7 @@ public void start()
hazelcastInstance = createHazelcastInstance();
log.info( "Cluster discovery service started" );
membershipRegistrationId = hazelcastInstance.getCluster().addMembershipListener( this );
listenerService.notifyListeners();
notifyMembershipChange();
}

@Override
Expand Down Expand Up @@ -132,7 +147,13 @@ private HazelcastInstance createHazelcastInstance()
{
tcpIpConfig.addMember( address.toString() );
}
log.info( "Discovering cluster with initial members: " + initialMembers );
Set<AdvertisedSocketAddress> previouslySeenMembers = discoveredMemberRepository.previouslyDiscoveredMembers();
for ( AdvertisedSocketAddress seenAddress : previouslySeenMembers )
{
tcpIpConfig.addMember( seenAddress.toString() );
}
log.info( String.format( "Discovering cluster with initial members: %s and previously seen members: %s",
initialMembers, previouslySeenMembers ) );

NetworkConfig networkConfig = new NetworkConfig();
ListenSocketAddress hazelcastAddress = config.get( CoreEdgeClusterSettings.discovery_listen_address );
Expand Down
Expand Up @@ -29,11 +29,12 @@
public class HazelcastDiscoveryServiceFactory implements DiscoveryServiceFactory
{
@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider )
{
makeHazelcastSilent( config );
hazelcastShouldNotPhoneHome();
return new HazelcastCoreTopologyService( config, myself, logProvider );
return new HazelcastCoreTopologyService( config, myself, discoveredMemberRepository, logProvider );
}

@Override
Expand Down
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.discovery;

import org.junit.Rule;
import org.junit.Test;

import java.util.Set;

import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.rule.TestDirectory;

import static org.junit.Assert.assertEquals;
import static org.neo4j.helpers.collection.Iterators.set;

public class DiscoveredMemberRepositoryTest
{
@Rule
public final TestDirectory testDirectory = TestDirectory.testDirectory();

private final FileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction();

@Test
public void shouldStoreDiscoveredMembers() throws Exception
{
// given
DiscoveredMemberRepository discoveredMemberRepositoryA =
new DiscoveredMemberRepository( testDirectory.directory(), fileSystem, NullLogProvider.getInstance() );

Set<AdvertisedSocketAddress> members =
set( new AdvertisedSocketAddress( "localhost:5003" ), new AdvertisedSocketAddress( "localhost:5004" ),
new AdvertisedSocketAddress( "localhost:5005" ) );

discoveredMemberRepositoryA.store( members );

// when
DiscoveredMemberRepository discoveredMemberRepositoryB =
new DiscoveredMemberRepository( testDirectory.directory(), fileSystem, NullLogProvider.getInstance() );

// then
assertEquals(members, discoveredMemberRepositoryB.previouslyDiscoveredMembers() );
}
}
Expand Up @@ -52,7 +52,8 @@ public class SharedDiscoveryService implements DiscoveryServiceFactory
private AtomicReference<ClusterId> clusterId = new AtomicReference<>();

@Override
public CoreTopologyService coreTopologyService( Config config, MemberId myself, LogProvider logProvider )
public CoreTopologyService coreTopologyService( Config config, MemberId myself,
DiscoveredMemberRepository discoveredMemberRepository, LogProvider logProvider )
{
return new SharedDiscoveryCoreClient( this, myself, logProvider, config );
}
Expand Down
Expand Up @@ -19,8 +19,8 @@
*/
package org.neo4j.coreedge.scenarios;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

import java.util.List;

Expand Down Expand Up @@ -49,7 +49,7 @@ public class ClusterMembershipChangeIT
public final ClusterRule clusterRule = new ClusterRule( getClass() ).withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() )
.withNumberOfCoreMembers( 3 );

@Ignore( "Incomplete, HC will hang waiting for others to join." )
@Test
public void newMemberNotInInitialMembersConfig() throws Throwable
{
// when
Expand Down

0 comments on commit 9d3151b

Please sign in to comment.