Skip to content

Commit

Permalink
Fix tests after cluster update
Browse files Browse the repository at this point in the history
The listen address was being used for discovering other members.
The correct thing is to use the advertised address.
Hazelcast does not understand addresses which resolve to
the same endpoint but are textually different.

Also aligned all the different names for initial hosts, which
is the historic name.
  • Loading branch information
martinfurmanski committed Jul 7, 2017
1 parent 08b9c41 commit 86cebb4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
Expand Up @@ -41,7 +41,6 @@
import java.util.stream.IntStream;

import org.neo4j.causalclustering.PortAuthority;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.core.LeaderCanWrite;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
Expand All @@ -56,7 +55,6 @@
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.SocketAddressParser;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
Expand Down Expand Up @@ -157,12 +155,12 @@ public CoreClusterMember addCoreMemberWithId( int memberId )
private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
{
List<AdvertisedSocketAddress> initialMembers = extractDiscoveryListenAddresses( coreMembers );
List<AdvertisedSocketAddress> initialHosts = extractInitialHosts( coreMembers );
CoreClusterMember coreClusterMember = createCoreClusterMember(
memberId,
PortAuthority.allocatePort(),
DEFAULT_CLUSTER_SIZE,
initialMembers,
initialHosts,
recordFormat,
extraParams,
instanceExtraParams
Expand All @@ -189,10 +187,10 @@ public ReadReplica addReadReplicaWithIdAndMonitors( @SuppressWarnings( "SamePara

private ReadReplica addReadReplica( int memberId, String recordFormat, Monitors monitors )
{
List<AdvertisedSocketAddress> hazelcastAddresses = extractDiscoveryListenAddresses( coreMembers );
List<AdvertisedSocketAddress> initialHosts = extractInitialHosts( coreMembers );
ReadReplica member = createReadReplica(
memberId,
hazelcastAddresses,
initialHosts,
readReplicaParams,
instanceReadReplicaParams,
recordFormat,
Expand Down Expand Up @@ -407,26 +405,26 @@ private boolean isLockExpired( Throwable e )
LockSessionExpired;
}

private static List<AdvertisedSocketAddress> extractDiscoveryListenAddresses( Map<Integer,CoreClusterMember> coreMembers )
private List<AdvertisedSocketAddress> extractInitialHosts( Map<Integer,CoreClusterMember> coreMembers )
{
return coreMembers.values().stream()
.map( member -> member.settingValue( CausalClusteringSettings.discovery_listen_address.name() ) )
.map( settingValue -> SocketAddressParser.socketAddress( settingValue, AdvertisedSocketAddress::new ) )
.map( CoreClusterMember::discoveryPort )
.map( port -> new AdvertisedSocketAddress( advertisedAddress, port ) )
.collect( toList() );
}

private void createCoreMembers( final int noOfCoreMembers,
List<AdvertisedSocketAddress> addresses, Map<String,String> extraParams,
List<AdvertisedSocketAddress> initialHosts, Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
{
for ( int i = 0; i < addresses.size(); i++ )
for ( int i = 0; i < initialHosts.size(); i++ )
{
int discoveryListenAddress = addresses.get( i ).getPort();
int discoveryListenAddress = initialHosts.get( i ).getPort();
CoreClusterMember coreClusterMember = createCoreClusterMember(
i,
discoveryListenAddress,
noOfCoreMembers,
addresses,
initialHosts,
recordFormat,
extraParams,
instanceExtraParams
Expand All @@ -438,7 +436,7 @@ private void createCoreMembers( final int noOfCoreMembers,
private CoreClusterMember createCoreClusterMember( int serverId,
int hazelcastPort,
int clusterSize,
List<AdvertisedSocketAddress> addresses,
List<AdvertisedSocketAddress> initialHosts,
String recordFormat,
Map<String, String> extraParams,
Map<String, IntFunction<String>> instanceExtraParams )
Expand All @@ -458,7 +456,7 @@ private CoreClusterMember createCoreClusterMember( int serverId,
httpPort,
backupPort,
clusterSize,
addresses,
initialHosts,
discoveryServiceFactory,
recordFormat,
parentDir,
Expand All @@ -470,7 +468,7 @@ private CoreClusterMember createCoreClusterMember( int serverId,
}

private ReadReplica createReadReplica( int serverId,
List<AdvertisedSocketAddress> coreMemberHazelcastAddresses,
List<AdvertisedSocketAddress> initialHosts,
Map<String, String> extraParams,
Map<String, IntFunction<String>> instanceExtraParams,
String recordFormat,
Expand All @@ -488,7 +486,7 @@ private ReadReplica createReadReplica( int serverId,
httpPort,
txPort,
backupPort, discoveryServiceFactory,
coreMemberHazelcastAddresses,
initialHosts,
extraParams,
instanceExtraParams,
recordFormat,
Expand Down Expand Up @@ -537,7 +535,7 @@ private void startReadReplicas( ExecutorService executor ) throws InterruptedExc
}

private void createReadReplicas( int noOfReadReplicas,
final List<AdvertisedSocketAddress> coreMemberAddresses,
final List<AdvertisedSocketAddress> initialHosts,
Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams,
String recordFormat )
Expand All @@ -546,7 +544,7 @@ private void createReadReplicas( int noOfReadReplicas,
{
ReadReplica readReplica = createReadReplica(
i,
coreMemberAddresses,
initialHosts,
extraParams,
instanceExtraParams,
recordFormat,
Expand Down
Expand Up @@ -64,10 +64,11 @@ public class CoreClusterMember implements ClusterMember
private final Map<String, String> config = stringMap();
private final int serverId;
private final String boltAdvertisedSocketAddress;
private final int discoveryPort;
private CoreGraphDatabase database;

public CoreClusterMember( int serverId,
int hazelcastPort,
int discoveryPort,
int txPort,
int raftPort,
int boltPort,
Expand All @@ -84,14 +85,15 @@ public CoreClusterMember( int serverId,
String advertisedAddress )
{
this.serverId = serverId;
this.discoveryPort = discoveryPort;

String initialMembers = addresses.stream().map( AdvertisedSocketAddress::toString ).collect( joining( "," ) );
boltAdvertisedSocketAddress = advertisedAddress( advertisedAddress, boltPort );

config.put( ClusterSettings.mode.name(), ClusterSettings.Mode.CORE.name() );
config.put( GraphDatabaseSettings.default_advertised_address.name(), advertisedAddress );
config.put( CausalClusteringSettings.initial_discovery_members.name(), initialMembers );
config.put( CausalClusteringSettings.discovery_listen_address.name(), listenAddress( listenAddress, hazelcastPort ) );
config.put( CausalClusteringSettings.discovery_listen_address.name(), listenAddress( listenAddress, discoveryPort ) );
config.put( CausalClusteringSettings.transaction_listen_address.name(), listenAddress( listenAddress, txPort ) );
config.put( CausalClusteringSettings.raft_listen_address.name(), listenAddress( listenAddress, raftPort ) );
config.put( CausalClusteringSettings.cluster_topology_refresh.name(), "1000ms" );
Expand Down Expand Up @@ -240,4 +242,9 @@ public void stopCatchupServer() throws Throwable
{
database.getDependencyResolver().resolveDependency( CatchupServer.class).stop();
}

int discoveryPort()
{
return discoveryPort;
}
}

0 comments on commit 86cebb4

Please sign in to comment.