Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private Mono<Cluster> doStart0() {
return TransportImpl.bind(config.transportConfig())
.flatMap(
transport1 -> {
localMember = createLocalMember(transport1.address().port());
localMember = createLocalMember(transport1.address());
transport = new SenderAwareTransport(transport1, localMember.address());

cidGenerator = new CorrelationIdGenerator(localMember.id());
Expand Down Expand Up @@ -350,18 +350,17 @@ private Flux<MembershipEvent> listenMembership() {
* Creates and prepares local cluster member. An address of member that's being constructed may be
* overriden from config variables.
*
* @param listenPort transport listen port
* @param address transport address
* @return local cluster member with cluster address and cluster member id
*/
private Member createLocalMember(int listenPort) {
String localAddress = Address.getLocalIpAddress().getHostAddress();
Integer port = Optional.ofNullable(config.memberPort()).orElse(listenPort);
private Member createLocalMember(Address address) {
int port = Optional.ofNullable(config.memberPort()).orElse(address.port());

// calculate local member cluster address
Address memberAddress =
Optional.ofNullable(config.memberHost())
.map(memberHost -> Address.create(memberHost, port))
.orElseGet(() -> Address.create(localAddress, listenPort));
.map(host -> Address.create(host, port))
.orElseGet(() -> Address.create(address.host(), port));

return new Member(Member.generateId(), config.memberAlias(), memberAddress);
}
Expand Down Expand Up @@ -507,11 +506,11 @@ public boolean isShutdown() {
private static class SenderAwareTransport implements Transport {

private final Transport transport;
private final Address sender;
private final Address address;

private SenderAwareTransport(Transport transport, Address sender) {
private SenderAwareTransport(Transport transport, Address address) {
this.transport = Objects.requireNonNull(transport);
this.sender = Objects.requireNonNull(sender);
this.address = Objects.requireNonNull(address);
}

@Override
Expand Down Expand Up @@ -545,7 +544,7 @@ public Flux<Message> listen() {
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(sender).build();
return Message.with(message).sender(address).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -166,15 +167,40 @@ public MembershipProtocolImpl(
.subscribe(this::onMemberRemoved)));
}

// Remove duplicates and local address
// Remove duplicates and local address(es)
private List<Address> cleanUpSeedMembers(Collection<Address> seedMembers) {
InetAddress localIpAddress = Address.getLocalIpAddress();

String hostAddress = localIpAddress.getHostAddress();
String hostName = localIpAddress.getHostName();

Address memberAddr = localMember.address();
Address transportAddr = transport.address();
Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port());
Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port());
Address memberAddByHostName = Address.create(hostName, memberAddr.port());
Address transportAddrByHostName = Address.create(hostName, transportAddr.port());

return new LinkedHashSet<>(seedMembers)
.stream()
.filter(address -> !address.equals(localMember.address()))
.filter(address -> !address.equals(transport.address()))
.filter(addr -> checkAddressesNotEqual(addr, memberAddr))
.filter(addr -> checkAddressesNotEqual(addr, transportAddr))
.filter(addr -> checkAddressesNotEqual(addr, memberAddrByHostAddress))
.filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostAddress))
.filter(addr -> checkAddressesNotEqual(addr, memberAddByHostName))
.filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostName))
.collect(Collectors.toList());
}

private boolean checkAddressesNotEqual(Address address0, Address address1) {
if (!address0.equals(address1)) {
return true;
} else {
LOGGER.warn("Filtering out seed address: {}", address0);
return false;
}
}

@Override
public Flux<MembershipEvent> listen() {
return subject.onBackpressureBuffer();
Expand Down
38 changes: 32 additions & 6 deletions cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.scalecube.cluster.membership.MembershipEvent.Type;
import io.scalecube.cluster.metadata.SimpleMapMetadataCodec;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -29,6 +30,7 @@
public class ClusterTest extends BaseTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
public static final int CONNECT_TIMEOUT = 3000;

@Test
public void testMembersAccessFromScheduler() {
Expand All @@ -50,29 +52,53 @@ public void testMembersAccessFromScheduler() {
}

@Test
public void testJoinLocalhostIgnored() {
Address[] addresses = {Address.from("localhost:4801"), Address.from("127.0.0.1:4801")};
public void testJoinLocalhostIgnored() throws InterruptedException {
InetAddress localIpAddress = Address.getLocalIpAddress();
Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 4801);
Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 4801);
Address[] addresses = {
Address.from("localhost:4801"),
Address.from("127.0.0.1:4801"),
Address.from("127.0.1.1:4801"),
localAddressByHostname,
localAddressByIp
};

// Start seed node
Cluster seedNode =
new ClusterImpl()
.transport(opts -> opts.port(4801).connectTimeout(500))
.transport(opts -> opts.port(4801).connectTimeout(CONNECT_TIMEOUT))
.membership(opts -> opts.seedMembers(addresses))
.startAwait();

Thread.sleep(CONNECT_TIMEOUT);

Collection<Member> otherMembers = seedNode.otherMembers();
assertEquals(0, otherMembers.size(), "otherMembers: " + otherMembers);
}

@Test
public void testJoinLocalhostIgnoredWithOverride() {
public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException {
InetAddress localIpAddress = Address.getLocalIpAddress();
Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 7878);
Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 7878);
Address[] addresses = {
Address.from("localhost:7878"),
Address.from("127.0.0.1:7878"),
Address.from("127.0.1.1:7878"),
localAddressByHostname,
localAddressByIp
};

// Start seed node
Cluster seedNode =
new ClusterImpl(new ClusterConfig().memberHost("localhost").memberPort(7878))
.transport(opts -> opts.port(7878).connectTimeout(500))
.membership(opts -> opts.seedMembers(Address.from("localhost:7878")))
.transport(opts -> opts.port(7878).connectTimeout(CONNECT_TIMEOUT))
.membership(opts -> opts.seedMembers(addresses))
.startAwait();

Thread.sleep(CONNECT_TIMEOUT);

Collection<Member> otherMembers = seedNode.otherMembers();
assertEquals(0, otherMembers.size(), "otherMembers: " + otherMembers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -98,7 +99,7 @@ public TransportImpl(TransportConfig config) {
*/
private TransportImpl(DisposableServer server, TransportImpl other) {
this.server = server;
this.address = Address.create(server.address().getHostString(), server.address().getPort());
this.address = prepareAddress(server);
this.config = other.config;
this.loopResources = other.loopResources;
this.messagesSubject = other.messagesSubject;
Expand All @@ -116,6 +117,16 @@ private TransportImpl(DisposableServer server, TransportImpl other) {
.subscribe(null, ex -> LOGGER.warn("Exception occurred on transport stop: " + ex));
}

private static Address prepareAddress(DisposableServer server) {
InetAddress address = server.address().getAddress();
int port = server.address().getPort();
if (address.isAnyLocalAddress()) {
return Address.create(Address.getLocalIpAddress().getHostAddress(), port);
} else {
return Address.create(address.getHostAddress(), port);
}
}

/**
* Init transport with the default configuration synchronously. Starts to accept connections on
* local address.
Expand Down Expand Up @@ -171,15 +182,22 @@ public Mono<Transport> bind0() {
.handle(this::onMessage)
.bind()
.doOnSuccess(
server ->
LOGGER.debug("Bound cluster transport on {}:{}", server.host(), server.port()))
server -> {
InetAddress address = server.address().getAddress();
if (address.isAnyLocalAddress()) {
LOGGER.debug("Bound cluster transport on *:{}", server.port());
} else {
LOGGER.debug(
"Bound cluster transport on {}:{}", address.getHostAddress(), server.port());
}
})
.doOnError(
ex ->
LOGGER.error(
"Failed to bind cluster transport on port={}, cause: {}",
config.port(),
ex.toString()))
.map(this::onBind);
.map(server -> new TransportImpl(server, this));
}

@Override
Expand Down Expand Up @@ -270,10 +288,6 @@ private Message toMessage(ByteBuf byteBuf) {
}
}

private TransportImpl onBind(DisposableServer server) {
return new TransportImpl(server, this);
}

private Mono<? extends Void> send0(Connection conn, Message message) {
// do send
return conn.outbound()
Expand Down