Skip to content

Commit

Permalink
HDDS-3273. getConf does not return all OM addresses. (apache#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
swagle authored and isa committed Mar 29, 2020
1 parent 37281ac commit 562ac8b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -89,6 +93,34 @@ public static InetSocketAddress getOmAddress(Configuration conf) {
return NetUtils.createSocketAddr(getOmRpcAddress(conf));
}

/**
* Return list of OM addresses by service ids - when HA is enabled.
*
* @param conf {@link Configuration}
* @return {service.id -> [{@link InetSocketAddress}]}
*/
public static Map<String, List<InetSocketAddress>> getOmHAAddressesById(
Configuration conf) {
Map<String, List<InetSocketAddress>> result = new HashMap<>();
for (String serviceId : conf.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY)) {
if (!result.containsKey(serviceId)) {
result.put(serviceId, new ArrayList<>());
}
for (String nodeId : getOMNodeIds(conf, serviceId)) {
String rpcAddr = getOmRpcAddress(conf,
addKeySuffixes(OZONE_OM_ADDRESS_KEY, serviceId, nodeId));
if (rpcAddr != null) {
result.get(serviceId).add(NetUtils.createSocketAddr(rpcAddr));
} else {
LOG.warn("Address undefined for nodeId: {} for service {}", nodeId,
serviceId);
}
}
}
return result;
}

/**
* Retrieve the socket address that is used by OM.
* @param conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,11 @@ static class OzoneManagersCommandHandler extends CommandHandler {
@Override
public int doWorkInternal(OzoneGetConf tool, String[] args)
throws IOException {
tool.printOut(OmUtils.getOmAddress(tool.getConf()).getHostName());
if (OmUtils.isServiceIdsDefined(tool.getConf())) {
tool.printOut(OmUtils.getOmHAAddressesById(tool.getConf()).toString());
} else {
tool.printOut(OmUtils.getOmAddress(tool.getConf()).getHostName());
}
return 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.io.IOUtils;
import org.junit.Rule;
Expand All @@ -31,9 +32,13 @@
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -113,6 +118,26 @@ public void createOMDirThrowsIfCannotCreate() throws IOException {
// expecting exception
}

@Test
public void testGetOmHAAddressesById() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OZONE_OM_SERVICE_IDS_KEY, "ozone1");
conf.set("ozone.om.nodes.ozone1", "node1,node2,node3");
conf.set("ozone.om.address.ozone1.node1", "1.1.1.1");
conf.set("ozone.om.address.ozone1.node2", "1.1.1.2");
conf.set("ozone.om.address.ozone1.node3", "1.1.1.3");
Map<String, List<InetSocketAddress>> addresses =
OmUtils.getOmHAAddressesById(conf);
assertFalse(addresses.isEmpty());
List<InetSocketAddress> rpcAddrs = addresses.get("ozone1");
assertFalse(rpcAddrs.isEmpty());
assertTrue(rpcAddrs.stream().anyMatch(
a -> a.getAddress().getHostAddress().equals("1.1.1.1")));
assertTrue(rpcAddrs.stream().anyMatch(
a -> a.getAddress().getHostAddress().equals("1.1.1.2")));
assertTrue(rpcAddrs.stream().anyMatch(
a -> a.getAddress().getHostAddress().equals("1.1.1.3")));
}
}

class TestDBCheckpoint implements DBCheckpoint {
Expand Down

0 comments on commit 562ac8b

Please sign in to comment.