Skip to content

Commit

Permalink
ZOOKEEPER-3188: improve based on code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
symat committed Aug 14, 2019
1 parent 6c4220a commit 42a52a6
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 40 deletions.
Expand Up @@ -19,10 +19,19 @@
package org.apache.zookeeper.server.admin;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.Environment.Entry;
import org.apache.zookeeper.Version;
Expand All @@ -36,7 +45,9 @@
import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.MultipleAddresses;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
Expand Down Expand Up @@ -620,57 +631,53 @@ public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs)
CommandResponse response = initializeResponse();
if (zkServer instanceof QuorumZooKeeperServer) {
QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
VotingView votingView = new VotingView(peer.getVotingView());
Map<Long, QuorumServerView> votingView = peer.getVotingView().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new QuorumServerView(e.getValue())));
response.put("current_config", votingView);
} else {
response.put("current_config", Collections.emptyMap());
}
return response;
}

private static class QuorumServerView {

private static class VotingView {
private final Map<Long, String> view;
@JsonProperty
private List<String> serverAddresses;

VotingView(Map<Long,QuorumPeer.QuorumServer> view) {
this.view = view.entrySet().stream()
.filter(e -> e.getValue().addr != null)
.collect(Collectors.toMap(Map.Entry::getKey,
e -> String.format("%s:%s%s",
getMultiAddressString(e.getValue()),
e.getValue().type.equals(QuorumPeer.LearnerType.PARTICIPANT) ? "participant" : "observer",
e.getValue().clientAddr ==null || e.getValue().isClientAddrFromStatic ? "" :
String.format(";%s:%d",
QuorumPeer.QuorumServer.delimitedHostString(e.getValue().clientAddr),
e.getValue().clientAddr.getPort())),
(v1, v2) -> v1, // cannot get duplicates as this straight draws from the other map
TreeMap::new));
}

private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
return qs.addr.getAllAddresses().stream()
.map(address -> getSingleAddressString(qs, address))
.collect(Collectors.joining(","));
}
@JsonProperty
private List<String> electionAddresses;

private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) {
final String addressHostString = address.getHostString();
final String delimitedHostString = QuorumPeer.QuorumServer.delimitedHostString(address);
@JsonProperty
private String clientAddress;

Optional<InetSocketAddress> matchingElectionAddress = qs.electionAddr.getAllAddresses().stream()
.filter(electionAddress -> electionAddress.getHostString().equals(addressHostString))
.findFirst();
final String electionPort = matchingElectionAddress.map(e-> ":" + e.getPort()).orElse("");
@JsonProperty
private String learnerType;

return String.format("%s:%d%s", delimitedHostString, address.getPort(), electionPort);
public QuorumServerView(QuorumPeer.QuorumServer quorumServer) {
this.serverAddresses = getMultiAddressString(quorumServer.addr);
this.electionAddresses = getMultiAddressString(quorumServer.electionAddr);
this.learnerType = quorumServer.type.equals(LearnerType.PARTICIPANT) ? "participant" : "observer";
this.clientAddress = getAddressString(quorumServer.clientAddr);
}

@JsonAnyGetter
public Map<Long, String> getView() {
return view;
private static List<String> getMultiAddressString(MultipleAddresses multipleAddresses) {
if(multipleAddresses == null) {
return Collections.emptyList();
}

return multipleAddresses.getAllAddresses().stream()
.map(QuorumServerView::getAddressString)
.collect(Collectors.toList());
}
}

private static String getAddressString(InetSocketAddress address) {
if(address == null) {
return "";
}
return String.format("%s:%d", QuorumPeer.QuorumServer.delimitedHostString(address), address.getPort());
}
}

}

Expand Down
Expand Up @@ -311,7 +311,7 @@ ServerSocket createServerSocket(InetSocketAddress address, boolean portUnificati
serverSocket.bind(address);
return serverSocket;
} catch (BindException e) {
LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
LOG.error("Couldn't bind to " + address.toString(), e);
throw e;
}
}
Expand Down
Expand Up @@ -114,7 +114,7 @@ public class QuorumCnxManager {
/*
* Protocol identifier used among peers
*/
public static final long PROTOCOL_VERSION = -65536L;
public static final long PROTOCOL_VERSION = -65535L;

/*
* Max buffer size to be read from the network.
Expand Down
Expand Up @@ -292,6 +292,12 @@ public void testWatchSummary() throws IOException, InterruptedException {
new Field("num_total_watches", Integer.class));
}

@Test
public void testVotingViewCommand() throws IOException, InterruptedException {
testCommand("voting_view",
new Field("current_config", Map.class));
}

@Test
public void testConsCommandSecureOnly() {
// Arrange
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -58,6 +59,8 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class CnxManagerTest extends ZKTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class);
protected static final int THRESHOLD = 4;
Expand Down Expand Up @@ -608,7 +611,7 @@ public void testInitialMessage() throws Exception {
Assert.fail("bad hostport accepted");
} catch (InitialMessage.InitialMessageException ex) {}

// good message
// good message, single election address
try {

hostport = "10.0.0.2:3888";
Expand All @@ -621,6 +624,30 @@ public void testInitialMessage() throws Exception {
// now parse it
din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
assertEquals(new Long(5L), msg.sid);
assertEquals(Arrays.asList(new InetSocketAddress("10.0.0.2", 3888)), msg.electionAddr);
} catch (InitialMessage.InitialMessageException ex) {
Assert.fail(ex.toString());
}

// good message, multiple election addresses (ZOOKEEPER-3188)
try {

hostport = "1.1.1.1:9999,2.2.2.2:8888,3.3.3.3:7777";
bos = new ByteArrayOutputStream();
dout = new DataOutputStream(bos);
dout.writeLong(5L); // sid
dout.writeInt(hostport.getBytes().length);
dout.writeBytes(hostport);

// now parse it
din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
assertEquals(new Long(5L), msg.sid);
assertEquals(Arrays.asList(new InetSocketAddress("1.1.1.1", 9999),
new InetSocketAddress("2.2.2.2", 8888),
new InetSocketAddress("3.3.3.3", 7777)),
msg.electionAddr);
} catch (InitialMessage.InitialMessageException ex) {
Assert.fail(ex.toString());
}
Expand Down

0 comments on commit 42a52a6

Please sign in to comment.