Skip to content

Commit

Permalink
HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates.…
Browse files Browse the repository at this point in the history
… (backported by Chao)

Summary:
This is needed for observer namenode. Note this backport only contains part
of the original patch, including:

  1. the changes on tests to support multiple namenodes
  2. changes on BlockTokenSecretManager and BootstrapStandby, to support
     multiple NNs

Notably, this doesn't include changes on EditLogTailer and StandbyCheckpointer
from the original patch. Those are for multiple standby NNs which we don't need.
Also, there are a few bugs in those changes.
  • Loading branch information
atm authored and sunchao committed Dec 4, 2017
1 parent 1a6b994 commit c7527fa
Show file tree
Hide file tree
Showing 28 changed files with 1,178 additions and 509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ public static String getNameNodeIdFromAddress(final Configuration conf,
}
return null;
}

/**
* Get the NN ID of the other node in an HA setup.
*
* Get the NN IDs of all other nodes in an HA setup.
*
* @param conf the configuration of this node
* @return the NN ID of the other node in this nameservice
* @return the NN IDs of all other nodes in this nameservice
*/
public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) {
public static List<String> getNameNodeIdOfOtherNodes(Configuration conf, String nsId) {
Preconditions.checkArgument(nsId != null,
"Could not determine namespace id. Please ensure that this " +
"machine is one of the machines listed as a NN RPC address, " +
Expand All @@ -165,43 +165,59 @@ public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) {
DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
nsId),
nsId);
Preconditions.checkArgument(nnIds.size() == 2,
"Expected exactly 2 NameNodes in namespace '%s'. " +
"Instead, got only %s (NN ids were '%s'",
nsId, nnIds.size(), Joiner.on("','").join(nnIds));
Preconditions.checkArgument(nnIds.size() >= 2,
"Expected at least 2 NameNodes in namespace '%s'. " +
"Instead, got only %s (NN ids were '%s')",
nsId, nnIds.size(), Joiner.on("','").join(nnIds));
Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
"Could not determine own NN ID in namespace '%s'. Please " +
"ensure that this node is one of the machines listed as an " +
"NN RPC address, or configure " + DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY,
nsId);

ArrayList<String> nnSet = Lists.newArrayList(nnIds);
nnSet.remove(myNNId);
assert nnSet.size() == 1;
return nnSet.get(0);
ArrayList<String> namenodes = Lists.newArrayList(nnIds);
namenodes.remove(myNNId);
assert namenodes.size() >= 1;
return namenodes;
}

/**
* Given the configuration for this node, return a Configuration object for
* the other node in an HA setup.
*
*
* @param myConf the configuration of this node
* @return the configuration of the other node in an HA setup
*/
public static Configuration getConfForOtherNode(
public static Configuration getConfForOtherNode(Configuration myConf) {
List<Configuration> otherConfs = getConfForOtherNodes(myConf);
Preconditions.checkArgument(otherConfs.size() == 1,
"Expected to only find one other NN in the current HA setting,"
+ " but got " + otherConfs.size() + ".");
return otherConfs.get(0);
}


/**
* Given the configuration for this node, return a Configuration object for
* all the other nodes in an HA setup.
*
* @param myConf the configuration of this node
* @return the configuration of all other nodes in an HA setup
*/
public static List<Configuration> getConfForOtherNodes(
Configuration myConf) {

String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
String otherNn = getNameNodeIdOfOtherNode(myConf, nsId);

// Look up the address of the active NN.
Configuration confForOtherNode = new Configuration(myConf);
// unset independent properties
for (String idpKey : HA_SPECIAL_INDEPENDENT_KEYS) {
confForOtherNode.unset(idpKey);
List<String> otherNn = getNameNodeIdOfOtherNodes(myConf, nsId);

// Look up the address of the other NNs
List<Configuration> confs = new ArrayList<Configuration>(otherNn.size());
for (String nn : otherNn) {
Configuration confForOtherNode = new Configuration(myConf);
NameNode.initializeGenericKeys(confForOtherNode, nsId, nn);
confs.add(confForOtherNode);
}
NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn);
return confForOtherNode;
return confs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,11 @@
@InterfaceAudience.Private
public class BlockTokenSecretManager extends
SecretManager<BlockTokenIdentifier> {
public static final Log LOG = LogFactory
.getLog(BlockTokenSecretManager.class);

// We use these in an HA setup to ensure that the pair of NNs produce block
// token serial numbers that are in different ranges.
private static final int LOW_MASK = ~(1 << 31);

public static final Log LOG = LogFactory.getLog(BlockTokenSecretManager.class);

public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();

private final boolean isMaster;
private int nnIndex;

/**
* keyUpdateInterval is the interval that NN updates its block keys. It should
Expand All @@ -78,48 +72,54 @@ public class BlockTokenSecretManager extends
private final Map<Integer, BlockKey> allKeys;
private String blockPoolId;
private final String encryptionAlgorithm;


private final int intRange;
private final int nnRangeStart;

private final SecureRandom nonceGenerator = new SecureRandom();

/**
* Timer object for querying the current time. Separated out for
* unit testing.
*/
private Timer timer;

/**
* Constructor for slaves.
*
*
* @param keyUpdateInterval how often a new key will be generated
* @param tokenLifetime how long an individual token is valid
*/
public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
encryptionAlgorithm);
encryptionAlgorithm, 0, 1);
}

/**
* Constructor for masters.
*
* @param keyUpdateInterval how often a new key will be generated
* @param tokenLifetime how long an individual token is valid
* @param nnIndex namenode index
* @param nnIndex namenode index of the namenode for which we are creating the manager
* @param blockPoolId block pool ID
* @param encryptionAlgorithm encryption algorithm to use
* @param numNNs number of namenodes possible
*/
public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime, int nnIndex, String blockPoolId,
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
String encryptionAlgorithm) {
this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
encryptionAlgorithm);
Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
this.nnIndex = nnIndex;
this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
Preconditions.checkArgument(nnIndex >= 0);
Preconditions.checkArgument(numNNs > 0);
setSerialNo(new SecureRandom().nextInt());
generateKeys();
}

private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
this.intRange = Integer.MAX_VALUE / numNNs;
this.nnRangeStart = intRange * nnIndex;
this.isMaster = isMaster;
this.keyUpdateInterval = keyUpdateInterval;
this.tokenLifetime = tokenLifetime;
Expand All @@ -132,7 +132,8 @@ private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,

@VisibleForTesting
public synchronized void setSerialNo(int serialNo) {
this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
// we mod the serial number by the range and then add that times the index
this.serialNo = (serialNo % intRange) + (nnRangeStart);
}

public void setBlockPoolId(String blockPoolId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.util.ExitUtil.terminate;

import java.io.IOException;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
Expand Down Expand Up @@ -446,14 +448,21 @@ private static BlockTokenSecretManager createBlockTokenSecretManager(
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);

if (isHaEnabled) {
String thisNnId = HAUtil.getNameNodeId(conf, nsId);
String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
return new BlockTokenSecretManager(updateMin*60*1000L,
lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
encryptionAlgorithm);
// figure out which index we are of the nns
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
String nnId = HAUtil.getNameNodeId(conf, nsId);
int nnIndex = 0;
for (String id : nnIds) {
if (id.equals(nnId)) {
break;
}
nnIndex++;
}
return new BlockTokenSecretManager(updateMin * 60 * 1000L,
lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
} else {
return new BlockTokenSecretManager(updateMin*60*1000L,
lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -79,10 +79,8 @@ public class BootstrapStandby implements Tool, Configurable {
private static final Log LOG = LogFactory.getLog(BootstrapStandby.class);
private String nsId;
private String nnId;
private String otherNNId;
private List<RemoteNameNodeInfo> remoteNNs;

private URL otherHttpAddr;
private InetSocketAddress otherIpcAddr;
private Collection<URI> dirsToFormat;
private List<URI> editUrisToFormat;
private List<URI> sharedEditsUris;
Expand Down Expand Up @@ -147,8 +145,8 @@ private void printUsage() {
+ "\twe have enough edits already in the shared directory to start\n"
+ "\tup from the last checkpoint on the active.");
}
private NamenodeProtocol createNNProtocolProxy()

private NamenodeProtocol createNNProtocolProxy(InetSocketAddress otherIpcAddr)
throws IOException {
return NameNodeProxies.createNonHAProxy(getConf(),
otherIpcAddr, NamenodeProtocol.class,
Expand All @@ -157,18 +155,36 @@ private NamenodeProtocol createNNProtocolProxy()
}

private int doRun() throws IOException {
NamenodeProtocol proxy = createNNProtocolProxy();
NamespaceInfo nsInfo;
boolean isUpgradeFinalized;
try {
nsInfo = proxy.versionRequest();
isUpgradeFinalized = proxy.isUpgradeFinalized();
} catch (IOException ioe) {
LOG.fatal("Unable to fetch namespace information from active NN at " +
otherIpcAddr + ": " + ioe.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Full exception trace", ioe);
// find the active NN
NamenodeProtocol proxy = null;
NamespaceInfo nsInfo = null;
boolean isUpgradeFinalized = false;
RemoteNameNodeInfo proxyInfo = null;
for (int i = 0; i < remoteNNs.size(); i++) {
proxyInfo = remoteNNs.get(i);
InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
proxy = createNNProtocolProxy(otherIpcAddress);
try {
// Get the namespace from any active NN. If you just formatted the primary NN and are
// bootstrapping the other NNs from that layout, it will only contact the single NN.
// However, if there cluster is already running and you are adding a NN later (e.g.
// replacing a failed NN), then this will bootstrap from any node in the cluster.
nsInfo = proxy.versionRequest();
isUpgradeFinalized = proxy.isUpgradeFinalized();
break;
} catch (IOException ioe) {
LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
+ ": " + ioe.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Full exception trace", ioe);
}
}
}

if (nsInfo == null) {
LOG.fatal(
"Unable to fetch namespace information from any remote NN. Possible NameNodes: "
+ remoteNNs);
return ERR_CODE_FAILED_CONNECT;
}

Expand All @@ -183,9 +199,9 @@ private int doRun() throws IOException {
"=====================================================\n" +
"About to bootstrap Standby ID " + nnId + " from:\n" +
" Nameservice ID: " + nsId + "\n" +
" Other Namenode ID: " + otherNNId + "\n" +
" Other NN's HTTP address: " + otherHttpAddr + "\n" +
" Other NN's IPC address: " + otherIpcAddr + "\n" +
" Other Namenode ID: " + proxyInfo.getNameNodeID() + "\n" +
" Other NN's HTTP address: " + proxyInfo.getHttpAddress() + "\n" +
" Other NN's IPC address: " + proxyInfo.getIpcAddress() + "\n" +
" Namespace ID: " + nsInfo.getNamespaceID() + "\n" +
" Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
" Cluster ID: " + nsInfo.getClusterID() + "\n" +
Expand All @@ -209,7 +225,7 @@ private int doRun() throws IOException {
}

// download the fsimage from active namenode
int download = downloadImage(storage, proxy);
int download = downloadImage(storage, proxy, proxyInfo);
if (download != 0) {
return download;
}
Expand Down Expand Up @@ -300,7 +316,7 @@ private void doUpgrade(NNStorage storage) throws IOException {
}
}

private int downloadImage(NNStorage storage, NamenodeProtocol proxy)
private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
throws IOException {
// Load the newly formatted image, using all of the directories
// (including shared edits)
Expand All @@ -322,7 +338,7 @@ private int downloadImage(NNStorage storage, NamenodeProtocol proxy)

// Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage(
otherHttpAddr, imageTxId, storage, true, true);
proxyInfo.getHttpAddress(), imageTxId, storage, true, true);
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
hash);

Expand Down Expand Up @@ -391,18 +407,26 @@ private void parseConfAndFindOtherNN() throws IOException {
throw new HadoopIllegalArgumentException(
"Shared edits storage is not enabled for this namenode.");
}

Configuration otherNode = HAUtil.getConfForOtherNode(conf);
otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
Preconditions.checkArgument(otherIpcAddr.getPort() != 0 &&
!otherIpcAddr.getAddress().isAnyLocalAddress(),
"Could not determine valid IPC address for other NameNode (%s)" +
", got: %s", otherNNId, otherIpcAddr);

final String scheme = DFSUtil.getHttpClientScheme(conf);
otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(
otherIpcAddr.getHostName(), otherNode, scheme).toURL();


remoteNNs = RemoteNameNodeInfo.getRemoteNameNodes(conf, nsId);
// validate the configured NNs
List<RemoteNameNodeInfo> remove = new ArrayList<RemoteNameNodeInfo>(remoteNNs.size());
for (RemoteNameNodeInfo info : remoteNNs) {
InetSocketAddress address = info.getIpcAddress();
LOG.info("Found nn: " + info.getNameNodeID() + ", ipc: " + info.getIpcAddress());
if (address.getPort() == 0 || address.getAddress().isAnyLocalAddress()) {
LOG.error("Could not determine valid IPC address for other NameNode ("
+ info.getNameNodeID() + ") , got: " + address);
remove.add(info);
}
}

// remove any invalid nns
remoteNNs.removeAll(remove);

// make sure we have at least one left to read
Preconditions.checkArgument(!remoteNNs.isEmpty(), "Could not find any valid namenodes!");

dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(
Expand Down

0 comments on commit c7527fa

Please sign in to comment.