From ff29e131ebeec774a2ac8675b5b91ecfe00c6561 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 4 Aug 2017 10:27:32 -0700 Subject: [PATCH] HDFS observer namenode implementation This adds a new type of namenode: observer. A observer is like a standby NN (in fact they share most of the code), EXCEPT it doesn't participate in either NN failover (i.e., it is not part of the HA), or check pointing. A observer can be specified through configuration. First, it needs to be added into the config: dfs.ha.namenodes, just like a normal namenode, together with other configs such as dfs.namenode.rpc-address, dfs.namenode.http-address, etc. Second, it needs to be specified in a new config: dfs.ha.observer.namenodes. This differentiate it from the ordinary active/standby namenodes. A observer can be used to serve read-only requests from HDFS client, when the following two conditions are satisfied: 1. the config dfs.client.failover.proxy.provider. is set to org.apache.hadoop.hdfs.server.namenode.ha.StaleReadProxyProvider. 2. the config dfs.client.enable.stale-read is set to true This also changes the way edit logs are loaded from the standby/observer NNs. Instead of loading them all at once, the new implementation loads them one batch at a time (default batch size is 10K edits) through multiple iterations, while waiting for a short amount of time in between the iterations (default waiting time is 100ms). This is to make sure the global lock won't be held too long during loading edits. Otherwise, the RPC processing time would suffer. This patch does not include a mechanism for clients to specify the bound of the staleness using journal transction ID: excluding this allows us to deploy the observer more easily. In more specific, the deployment involves: 1. restarting all datanodes with the updated configs. No binary change on datanodes is required. 2. bootstraping and starting the observer namenode, with the updated configs. Existing namenodes do not need to change. Future tasks: 1. allow client to set a bound on staleness in observer in terms of time (e.g., 2min). If for some reason the lagging in edit tailing is larger than the bound, the client-side proxy provider will fail over all the RPCs to the active namenode. 2. use journal transaction ID to ensure bound on staleness. This can be embedded in the RPC header. 3. allow new standby/observer to be deployed without datanode restart. --- .../io/retry/RetryInvocationHandler.java | 5 + .../apache/hadoop/io/retry/RetryPolicies.java | 17 +- .../java/org/apache/hadoop/ipc/Client.java | 1 + .../org/apache/hadoop/hdfs/DFSUtilClient.java | 134 +++++++- .../hdfs/client/HdfsClientConfigKeys.java | 4 + .../hadoop/hdfs/protocol/ClientProtocol.java | 21 ++ .../ha/ConfiguredFailoverProxyProvider.java | 18 +- .../hdfs/server/namenode/ha/ReadOnly.java | 35 +++ .../namenode/ha/StaleReadProxyProvider.java | 269 ++++++++++++++++ .../src/main/bin/distribute-exclude.sh | 9 + .../hadoop-hdfs/src/main/bin/start-dfs.sh | 11 + .../hadoop-hdfs/src/main/bin/stop-dfs.sh | 12 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 153 +++++---- .../java/org/apache/hadoop/hdfs/HAUtil.java | 42 ++- .../server/common/HdfsServerConstants.java | 4 +- .../hdfs/server/namenode/FSEditLogLoader.java | 102 ++++-- .../hdfs/server/namenode/FSNamesystem.java | 34 +- .../hadoop/hdfs/server/namenode/NameNode.java | 50 ++- .../server/namenode/NameNodeRpcServer.java | 1 + .../server/namenode/ha/EditLogTailer.java | 30 +- .../hdfs/server/namenode/ha/HAContext.java | 6 + .../server/namenode/ha/ObserverState.java | 69 ++++ .../namenode/ha/RemoteNameNodeInfo.java | 2 +- .../org/apache/hadoop/hdfs/tools/GetConf.java | 21 +- .../src/main/resources/hdfs-default.xml | 21 ++ .../src/site/markdown/HDFSCommands.md | 2 + .../apache/hadoop/hdfs/MiniDFSCluster.java | 90 +++++- .../apache/hadoop/hdfs/MiniDFSNNTopology.java | 52 ++- .../org/apache/hadoop/hdfs/TestDFSUtil.java | 5 +- .../hdfs/qjournal/MiniQJMHACluster.java | 24 +- .../hdfs/server/namenode/ha/HATestUtil.java | 79 ++++- .../namenode/ha/TestObserverNameNode.java | 295 ++++++++++++++++++ .../apache/hadoop/hdfs/tools/TestGetConf.java | 147 ++++++--- 33 files changed, 1578 insertions(+), 187 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StaleReadProxyProvider.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverState.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNameNode.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 6f90ac4390a8f..8589515c789f7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -435,4 +435,9 @@ public void close() throws IOException { public ConnectionId getConnectionId() { return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy()); } + + @VisibleForTesting + public FailoverProxyProvider getProxyProvider() { + return proxyDescriptor.fpp; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index 0c523a5d23e13..9e9864b395f5a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -669,7 +669,8 @@ public RetryAction shouldRetry(Exception e, int retries, e instanceof UnknownHostException || e instanceof StandbyException || e instanceof ConnectTimeoutException || - isWrappedStandbyException(e)) { + isWrappedStandbyException(e) || + isStandbyException(e)) { return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY, getFailoverOrRetrySleepTime(failovers)); } else if (e instanceof RetriableException @@ -734,4 +735,18 @@ static RetriableException getWrappedRetriableException(Exception e) { return unwrapped instanceof RetriableException ? (RetriableException) unwrapped : null; } + + private static boolean isStandbyException(Exception ex) { + Throwable cause = ex.getCause(); + if (cause != null) { + Throwable cause2 = cause.getCause(); + if (cause2 instanceof RemoteException) { + RemoteException remoteException = (RemoteException)cause2; + IOException unwrapRemoteException = + remoteException.unwrapRemoteException(); + return unwrapRemoteException instanceof StandbyException; + } + } + return false; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 79a43fdf81987..37c2c7949fb95 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 189f1c670d012..9f659ea369d9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; import java.io.IOException; @@ -128,18 +129,52 @@ public static Collection getNameServiceIds(Configuration conf) { /** * Namenode HighAvailability related configuration. - * Returns collection of namenode Ids from the configuration. One logical id - * for each namenode in the in the HA setup. + * Returns collection of namenode Ids (including observer NNs) from the configuration. + * One logical id for each namenode in the HA setup. * * @param conf configuration - * @param nsId the nameservice ID to look at, or null for non-federated - * @return collection of namenode Ids + * @param nsId the nameservice ID to look at, or null for + * non-federated + * @return collection of namenode Ids, including observer namenodes. */ public static Collection getNameNodeIds(Configuration conf, String nsId) { String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId); return conf.getTrimmedStringCollection(key); } + /** + * Returns collection of observer namenode Ids from the configuration. + * One logical id for each observer in the HA setup. + * + * @param conf configuration + * @param nsId the nameservice ID to look at, or null for non-federated + * @return collection of observer namenode Ids + */ + public static Collection getObserverNameNodeIds(Configuration conf, String nsId) { + String key = addSuffix(DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX, nsId); + return conf.getTrimmedStringCollection(key); + } + + /** + * Namenode HighAvailability related configuration. + * Returns collection of namenode Ids from the configuration, excluding observer namenodes. + * One logical id for each namenode in the HA setup. + * + * @param conf configuration + * @param nsId the nameservice ID to look at, or null for non-federated + * @return collection of namenode Ids, excluding observer namenodes. + */ + public static Collection getNameNodeIdsExcludingObservers( + Configuration conf, String nsId) { + Collection allNNIds = getNameNodeIds(conf, nsId); + Collection observerNNIds = getObserverNameNodeIds(conf, nsId); + if (!allNNIds.containsAll(observerNNIds)) { + throw new IllegalArgumentException("Observer NameNodes should be part of all NameNodes"); + } + allNNIds.removeAll(observerNNIds); + return allNNIds; + } + /** Add non empty and non null suffix to a key */ static String addSuffix(String key, String suffix) { if (suffix == null || suffix.isEmpty()) { @@ -152,15 +187,30 @@ static String addSuffix(String key, String suffix) { /** * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from - * the configuration. + * the configuration. Note this does not include the NN RPC addresses from + * observer namenodes. * * @param conf configuration - * @return list of InetSocketAddresses + * @return list of InetSocketAddresse, not including those of observer NNs. */ public static Map> getHaNnRpcAddresses( Configuration conf) { return DFSUtilClient.getAddresses(conf, null, - HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + } + + /** + * Returns list of InetSocketAddress corresponding to observer namenode RPC + * addresses from the configuration. + * + * @param conf configuration + * @return list of InetSocketAddresses for observer namenodes + */ + public static Map> getObserverRpcAddresses( + Configuration conf) { + // Observer namenodes share the same RPC address key with ordinary namenodes + return DFSUtilClient.getObserverAddresses(conf, null, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); } /** @@ -320,6 +370,15 @@ static String concatSuffixes(String... suffixes) { return Joiner.on(".").skipNulls().join(suffixes); } + /** + * Enum for selecting different types of namenodes (e.g., namenode, observers) + */ + enum NameNodeType { + NAMENODE, + OBSERVER, + ALL + } + /** * Returns the configured address for all NameNodes in the cluster. * @param conf configuration @@ -329,12 +388,34 @@ static String concatSuffixes(String... suffixes) { */ static Map> getAddresses( Configuration conf, String defaultAddress, String... keys) { + return getAddressesInternal(conf, defaultAddress, NameNodeType.NAMENODE, keys); + } + + /** + * Returns the configured address for all Observer NameNodes in the cluster. + * @param conf configuration + * @param defaultAddress default address to return in case key is not found. + * @param keys Set of keys to look for in the order of preference + * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) + */ + static Map> getObserverAddresses( + Configuration conf, String defaultAddress, String... keys) { + return getAddressesInternal(conf, defaultAddress, NameNodeType.OBSERVER, keys); + } + + /** + * Internal function to get configured addresses for a particular namenode + * type 'type'. + */ + static Map> getAddressesInternal( + Configuration conf, String defaultAddress, NameNodeType type, String... keys) { Collection nameserviceIds = getNameServiceIds(conf); - return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); + return getAddressesForNsIdsInternal(conf, nameserviceIds, defaultAddress, type, keys); } /** - * Returns the configured address for all NameNodes in the cluster. + * Returns the configured address for both ordinary (active/standby) namenodes + * and observer namenodes in the cluster. * @param conf configuration * @param defaultAddress default address to return in case key is not found. * @param keys Set of keys to look for in the order of preference @@ -344,12 +425,23 @@ static Map> getAddresses( static Map> getAddressesForNsIds( Configuration conf, Collection nsIds, String defaultAddress, String... keys) { + return getAddressesForNsIdsInternal( + conf, nsIds, defaultAddress, NameNodeType.ALL, keys); + } + + /** + * Internal function to get configured addresses for namenodes of type 'type' and + * nameservices with IDs 'nsIds'. + */ + static Map> getAddressesForNsIdsInternal( + Configuration conf, Collection nsIds, String defaultAddress, + NameNodeType type, String... keys) { // Look for configurations of the form [.][.] // across all of the configured nameservices and namenodes. Map> ret = Maps.newLinkedHashMap(); for (String nsId : emptyAsSingletonNull(nsIds)) { Map isas = - getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); + getAddressesForNameserviceIdInternal(conf, nsId, defaultAddress, type, keys); if (!isas.isEmpty()) { ret.put(nsId, isas); } @@ -359,7 +451,27 @@ static Map> getAddressesForNsIds( static Map getAddressesForNameserviceId( Configuration conf, String nsId, String defaultValue, String... keys) { - Collection nnIds = getNameNodeIds(conf, nsId); + return getAddressesForNameserviceIdInternal( + conf, nsId, defaultValue, NameNodeType.NAMENODE, keys); + } + + private static Map getAddressesForNameserviceIdInternal( + Configuration conf, String nsId, String defaultValue, + NameNodeType type, String... keys) { + Collection nnIds; + switch (type) { + case NAMENODE: + nnIds = getNameNodeIdsExcludingObservers(conf, nsId); + break; + case OBSERVER: + nnIds = getObserverNameNodeIds(conf, nsId); + break; + case ALL: + nnIds = getNameNodeIds(conf, nsId); + break; + default: + throw new IllegalArgumentException("Invalid namenode type: " + type.name()); + } Map ret = Maps.newHashMap(); for (String nnId : emptyAsSingletonNull(nnIds)) { String suffix = concatSuffixes(nsId, nnId); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 815260f51384d..2845ddadba8dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -71,6 +71,7 @@ public interface HdfsClientConfigKeys { int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470; String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; + String DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX = "dfs.ha.observer.namenodes"; String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled"; boolean DFS_WEBHDFS_ENABLED_DEFAULT = true; String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port"; @@ -117,6 +118,9 @@ public interface HdfsClientConfigKeys { String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout"; long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; + String DFS_CLIENT_ENABLE_STALE_READ = + "dfs.client.enable.stale-read"; + boolean DFS_CLIENT_ENABLE_STALE_READ_DEFAULT = false; // Much code in hdfs is not yet updated to use these keys. // the initial delay (unit is ms) for locateFollowingBlock, the delay time // will increase exponentially(double) for each retry. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 587a15cee9797..d68875c3efa19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -124,6 +125,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException; @@ -133,6 +135,7 @@ LocatedBlocks getBlockLocations(String src, long offset, long length) * @throws IOException */ @Idempotent + @ReadOnly FsServerDefaults getServerDefaults() throws IOException; /** @@ -265,6 +268,7 @@ boolean setReplication(String src, short replication) * @return All the in-use block storage policies currently. */ @Idempotent + @ReadOnly BlockStoragePolicy[] getStoragePolicies() throws IOException; /** @@ -307,6 +311,7 @@ void setStoragePolicy(String src, String policyName) * If file/dir src is not found */ @Idempotent + @ReadOnly BlockStoragePolicy getStoragePolicy(String path) throws IOException; /** @@ -437,6 +442,7 @@ LocatedBlock addBlock(String src, String clientName, * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly LocatedBlock getAdditionalDatanode(final String src, final long fileId, final ExtendedBlock blk, final DatanodeInfo[] existings, @@ -671,6 +677,7 @@ boolean mkdirs(String src, FsPermission masked, boolean createParent) * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException; @@ -681,6 +688,7 @@ DirectoryListing getListing(String src, byte[] startAfter, * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException; @@ -752,6 +760,7 @@ SnapshottableDirectoryStatus[] getSnapshottableDirListing() * actual numbers to index into the array. */ @Idempotent + @ReadOnly long[] getStats() throws IOException; /** @@ -780,6 +789,7 @@ DatanodeStorageReport[] getDatanodeStorageReport( * a symlink. */ @Idempotent + @ReadOnly long getPreferredBlockSize(String filename) throws IOException; @@ -919,6 +929,7 @@ RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) * cookie returned from the previous call. */ @Idempotent + @ReadOnly CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException; @@ -954,6 +965,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly HdfsFileStatus getFileInfo(String src) throws IOException; /** @@ -968,6 +980,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly boolean isFileClosed(String src) throws IOException; /** @@ -997,6 +1010,7 @@ CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly ContentSummary getContentSummary(String path) throws IOException; /** @@ -1109,6 +1123,7 @@ void createSymlink(String target, String link, FsPermission dirPerm, * or an I/O error occurred */ @Idempotent + @ReadOnly String getLinkTarget(String path) throws IOException; /** @@ -1379,6 +1394,7 @@ void removeAclEntries(String src, List aclSpec) * Gets the ACLs of files and directories. */ @Idempotent + @ReadOnly AclStatus getAclStatus(String src) throws IOException; /** @@ -1392,6 +1408,7 @@ void createEncryptionZone(String src, String keyName) * Get the encryption zone for a path. */ @Idempotent + @ReadOnly EncryptionZone getEZForPath(String src) throws IOException; @@ -1403,6 +1420,7 @@ EncryptionZone getEZForPath(String src) * @return Batch of encryption zones. */ @Idempotent + @ReadOnly BatchedEntries listEncryptionZones( long prevId) throws IOException; @@ -1436,6 +1454,7 @@ void setXAttr(String src, XAttr xAttr, EnumSet flag) * @throws IOException */ @Idempotent + @ReadOnly List getXAttrs(String src, List xAttrs) throws IOException; @@ -1451,6 +1470,7 @@ List getXAttrs(String src, List xAttrs) * @throws IOException */ @Idempotent + @ReadOnly List listXAttrs(String src) throws IOException; @@ -1513,6 +1533,7 @@ List listXAttrs(String src) * @throws IOException If an I/O error occurred */ @Idempotent + @ReadOnly QuotaUsage getQuotaUsage(String path) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 9023c0da1ae40..88bac51f83bb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -50,11 +50,11 @@ public class ConfiguredFailoverProxyProvider extends protected final Configuration conf; protected final List> proxies = new ArrayList>(); - private final UserGroupInformation ugi; + protected final UserGroupInformation ugi; protected final Class xface; private int currentProxyIndex = 0; - private final HAProxyFactory factory; + protected final HAProxyFactory factory; public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory) { @@ -113,16 +113,20 @@ public Class getInterface() { @Override public synchronized ProxyInfo getProxy() { AddressRpcProxyPair current = proxies.get(currentProxyIndex); - if (current.namenode == null) { + return getProxyInfo(current); + } + + protected ProxyInfo getProxyInfo(AddressRpcProxyPair p) { + if (p.namenode == null) { try { - current.namenode = factory.createProxy(conf, - current.address, xface, ugi, false, getFallbackToSimpleAuth()); + p.namenode = factory.createProxy(conf, + p.address, xface, ugi, false, getFallbackToSimpleAuth()); } catch (IOException e) { LOG.error("Failed to create RPC proxy to NameNode", e); throw new RuntimeException(e); } } - return new ProxyInfo(current.namenode, current.address.toString()); + return new ProxyInfo<>(p.namenode, p.address.toString()); } @Override @@ -138,7 +142,7 @@ synchronized void incrementProxyIndex() { * A little pair object to store the address and connected RPC proxy object to * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. */ - private static class AddressRpcProxyPair { + protected static class AddressRpcProxyPair { public final InetSocketAddress address; public T namenode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java new file mode 100644 index 0000000000000..bf92c23a08671 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Used to mark certain method as read only operations. + */ +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@InterfaceStability.Evolving +public @interface ReadOnly { +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StaleReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StaleReadProxyProvider.java new file mode 100644 index 0000000000000..240fa3e420a9a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StaleReadProxyProvider.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.HAUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.io.retry.MultiException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation that supports + * stale read from observer namenode(s). + * + * This constructs a wrapper proxy that sends the request to observer namenode(s), if stale + * read is enabled (by setting {@link HdfsClientConfigKeys#DFS_CLIENT_ENABLE_STALE_READ} to + * true). In case there are multiple observer namenodes, it will try them one by one in case + * the RPC failed. It will fail back to the active namenode after it has exhausted all the + * observer namenodes. + * + * Write requests will still be sent to active NN. Read requests will still go to active NN + * if {@link HdfsClientConfigKeys#DFS_CLIENT_ENABLE_STALE_READ} is set to false. + */ +public class StaleReadProxyProvider extends ConfiguredFailoverProxyProvider { + private static final Logger LOG = LoggerFactory.getLogger(StaleReadProxyProvider.class); + + /** Maximum number of retries on the active NN. This is used if all observer NNs have failed. */ + private static final int MAX_ACTIVE_RETRY_NUM = 2; + + /** Proxies for the observer namenodes */ + private LinkedList> observerProxies = null; + + /** + * Whether stale read has been turned on. If this is false, all read requests will + * still go to active NN. + */ + private final boolean staleReadEnabled; + + /** The last proxy that has been used. Only used for testing */ + private volatile ProxyInfo lastProxy = null; + + public StaleReadProxyProvider( + Configuration conf, URI uri, Class xface, HAProxyFactory factory) { + super(conf, uri, xface, factory); + + // Initialize observer namenode list + Map> addressMap = + DFSUtilClient.getObserverRpcAddresses(conf); + Map addressesInNN = addressMap.get(uri.getHost()); + + if (addressesInNN == null || addressesInNN.isEmpty()) { + throw new RuntimeException("Could not find any configured observer " + + "namenode address for URI " + uri); + } + + observerProxies = new LinkedList<>(); + Collection addressesOfNns = addressesInNN.values(); + for (InetSocketAddress address : addressesOfNns) { + observerProxies.add(new AddressRpcProxyPair(address)); + } + + staleReadEnabled = conf.getBoolean( + HdfsClientConfigKeys.DFS_CLIENT_ENABLE_STALE_READ, + HdfsClientConfigKeys.DFS_CLIENT_ENABLE_STALE_READ_DEFAULT); + + if (staleReadEnabled) { + LOG.info("Stale read from observer namenode is enabled"); + } + + // The client may have a delegation token set for the logical + // URI of the cluster. Clone this token to apply to each of the + // underlying IPC addresses so that the IPC code can find it. + // Copied from the parent class. + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); + } + + @SuppressWarnings("unchecked") + @Override + public synchronized ProxyInfo getProxy() { + // We just create a wrapped proxy containing all the proxies + List> observerProxies = new LinkedList<>(); + StringBuilder combinedInfo = new StringBuilder("["); + + for (int i = 0; i < this.observerProxies.size(); i++) { + if (i > 0) { + combinedInfo.append(","); + } + AddressRpcProxyPair p = this.observerProxies.get(i); + ProxyInfo pInfo = getProxyInfo(p); + observerProxies.add(pInfo); + combinedInfo.append(pInfo.proxyInfo); + } + + combinedInfo.append(']'); + T wrappedProxy = (T) Proxy.newProxyInstance( + StaleReadInvocationHandler.class.getClassLoader(), + new Class[]{xface}, + new StaleReadInvocationHandler(observerProxies)); + return new ProxyInfo<>(wrappedProxy, combinedInfo.toString()); + } + + /** + * Check the exception returned by the proxy log a warning message if it's + * not a StandbyException (expected exception). + * @param ex exception to evaluate. + * @param proxyInfo information of the proxy reporting the exception. + */ + private void logProxyException(Exception ex, String proxyInfo) { + if (isStandbyException(ex)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Invocation returned standby exception on [" + + proxyInfo + "]"); + } + } else { + LOG.warn("Invocation returned exception on [" + proxyInfo + "]", ex); + } + } + + /** + * Check if the returned exception is caused by an standby namenode. + * @param ex exception to check. + * @return true if the exception is caused by an standby namenode. False otherwise. + */ + private boolean isStandbyException(Exception ex) { + Throwable cause = ex.getCause(); + if (cause != null) { + Throwable cause2 = cause.getCause(); + if (cause2 instanceof RemoteException) { + RemoteException remoteException = (RemoteException)cause2; + IOException unwrapRemoteException = + remoteException.unwrapRemoteException(); + return unwrapRemoteException instanceof StandbyException; + } + } + return false; + } + + /** + * Check if a method is read-only. + * @return whether the 'method' is a read-only operation. + */ + private boolean isRead(Method method) { + Annotation[] annotations = method.getDeclaredAnnotations(); + for (Annotation annotation : annotations) { + if (annotation.annotationType() == ReadOnly.class) { + return true; + } + } + return false; + } + + @VisibleForTesting + public ProxyInfo getLastProxy() { + return lastProxy; + } + + class StaleReadInvocationHandler implements InvocationHandler { + final List> observerProxies; + + StaleReadInvocationHandler(List> observerProxies) { + this.observerProxies = observerProxies; + Collections.shuffle(this.observerProxies); + } + + void handleInvokeException(Map badResults, + Exception e, String proxyInfo) { + logProxyException(e, proxyInfo); + if (isStandbyException(e)) { + // To let the retry policy identify the remote exception + badResults.put(proxyInfo, (RemoteException) e.getCause().getCause()); + } else { + badResults.put(proxyInfo, e); + } + } + + /** + * Sends read operations to the first observer NN (if enabled), and + * send write operations to the active NN. If a observer NN fails, it is sent + * to the back of the queue and the next is retried. If all observers fail, + * we re-probe all the NNs and retry on the active. + */ + @Override + public Object invoke(Object proxy, final Method method, final Object[] args) + throws Throwable { + Map badResults = new HashMap<>(); + lastProxy = null; + if (staleReadEnabled && isRead(method)) { + List> failedProxies = new LinkedList<>(); + Object retVal = null; + boolean success = false; + Iterator> it = observerProxies.iterator(); + while (it.hasNext()) { + ProxyInfo current = it.next(); + try { + retVal = method.invoke(current.proxy, args); + lastProxy = current; + success = true; + } catch (Exception e) { + it.remove(); + failedProxies.add(current); + handleInvokeException(badResults, e, current.proxyInfo); + } + } + observerProxies.addAll(failedProxies); + if (success) { + return retVal; + } + } + + // If we get here, it means all observer NNs have failed. At this point + // we'll try to fail over to the active NN. + for (int i = 0; i < MAX_ACTIVE_RETRY_NUM; i++) { + Object retVal; + ProxyInfo activeProxy = StaleReadProxyProvider.super.getProxy(); + try { + retVal = method.invoke(activeProxy.proxy, args); + lastProxy = activeProxy; + return retVal; + } catch (Exception e) { + handleInvokeException(badResults, e, activeProxy.proxyInfo); + } + } + + // At this point we should have ALL bad results (Exceptions) + // Or should have returned with successful result. + if (badResults.size() == 1) { + throw badResults.values().iterator().next(); + } else { + throw new MultiException(badResults); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh index 66fc14a2468d3..4e37d07ea2f23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh @@ -53,6 +53,7 @@ if [ ! -f "$excludeFilenameLocal" ] ; then fi namenodes=$("$HADOOP_PREFIX/bin/hdfs" getconf -namenodes) +observers=$("$HADOOP_PREFIX/bin/hdfs" getconf -observers) excludeFilenameRemote=$("$HADOOP_PREFIX/bin/hdfs" getconf -excludeFile) if [ "$excludeFilenameRemote" = '' ] ; then @@ -71,6 +72,14 @@ for namenode in $namenodes ; do if [ "$?" != '0' ] ; then errorFlag='1' ; fi done +echo "Copying exclude file [$excludeFilenameRemote] to observers:" + +for observer in $observers ; do + echo " [$observer]" + scp "$excludeFilenameLocal" "$namenode:$excludeFilenameRemote" + if [ "$?" != '0' ] ; then errorFlag='1' ; fi +done + if [ "$errorFlag" = '1' ] ; then echo "Error: transfer of exclude file failed, see error messages above." exit 1 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh index a8c2b98c4e44b..37990b3b4dd75 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh @@ -54,6 +54,7 @@ nameStartOpt="$nameStartOpt $@" # namenodes NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -namenodes) +OBSERVERS=$($HADOOP_PREFIX/bin/hdfs getconf -observers) echo "Starting namenodes on [$NAMENODES]" @@ -62,6 +63,16 @@ echo "Starting namenodes on [$NAMENODES]" --hostnames "$NAMENODES" \ --script "$bin/hdfs" start namenode $nameStartOpt +#--------------------------------------------------------- +# observer namenodes (if any) + +# TODO: should we pass other startup options to observers? +echo "Starting observer namenodes on [$OBSERVERS]" +"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \ + --config "$HADOOP_CONF_DIR" \ + --hostnames "$OBSERVERS" \ + --script "$bin/hdfs" start namenode -observer + #--------------------------------------------------------- # datanodes (using default slaves file) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh index 6a622fae47391..2a5122589f759 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh @@ -34,6 +34,18 @@ echo "Stopping namenodes on [$NAMENODES]" --hostnames "$NAMENODES" \ --script "$bin/hdfs" stop namenode +#--------------------------------------------------------- +# observers + +OBSERVERS=$($HADOOP_PREFIX/bin/hdfs getconf -observers) + +echo "Stopping observer namenodes on [$OBSERVERS]" + +"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \ + --config "$HADOOP_CONF_DIR" \ + --hostnames "$OBSERVERS" \ + --script "$bin/hdfs" stop namenode + #--------------------------------------------------------- # datanodes (using default slaves file) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 40aeaafe992de..1a00ece8991ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -35,6 +35,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; +import static org.apache.hadoop.hdfs.DFSUtilClient.NameNodeType; import java.io.IOException; import java.io.PrintStream; @@ -482,17 +483,23 @@ public static Map> getSecondaryNameNodeAd /** * Returns list of InetSocketAddresses corresponding to namenodes from the * configuration. - * + * * Returns namenode address specifically configured for datanodes (using * service ports), if found. If not, regular RPC address configured for other * clients is returned. - * + * * @param conf configuration * @return list of InetSocketAddress * @throws IOException on error */ public static Map> getNNServiceRpcAddresses( Configuration conf) throws IOException { + return getNNServiceRpcAddressesInternal(conf, NameNodeType.NAMENODE); + } + + private static Map> + getNNServiceRpcAddressesInternal(Configuration conf, NameNodeType type) + throws IOException { // Use default address as fall back String defaultAddress; try { @@ -503,9 +510,9 @@ public static Map> getNNServiceRpcAddress } Map> addressList = - DFSUtilClient.getAddresses(conf, defaultAddress, - DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, - DFS_NAMENODE_RPC_ADDRESS_KEY); + DFSUtilClient.getAddressesInternal(conf, defaultAddress, type, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, + DFS_NAMENODE_RPC_ADDRESS_KEY); if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: namenode address " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " @@ -515,21 +522,9 @@ public static Map> getNNServiceRpcAddress return addressList; } - /** - * Returns list of InetSocketAddresses corresponding to the namenode - * that manages this cluster. Note this is to be used by datanodes to get - * the list of namenode addresses to talk to. - * - * Returns namenode address specifically configured for datanodes (using - * service ports), if found. If not, regular RPC address configured for other - * clients is returned. - * - * @param conf configuration - * @return list of InetSocketAddress - * @throws IOException on error - */ - public static Map> - getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException { + private static Map> getNNServiceRpcAddressesInternal( + Configuration conf, Collection parentNameServices, + NameNodeType type) throws IOException { // Use default address as fall back String defaultAddress; try { @@ -539,29 +534,12 @@ public static Map> getNNServiceRpcAddress defaultAddress = null; } - Collection parentNameServices = conf.getTrimmedStringCollection - (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); - - if (parentNameServices.isEmpty()) { - parentNameServices = conf.getTrimmedStringCollection - (DFSConfigKeys.DFS_NAMESERVICES); - } else { - // Ensure that the internal service is ineed in the list of all available - // nameservices. - Set availableNameServices = Sets.newHashSet(conf - .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES)); - for (String nsId : parentNameServices) { - if (!availableNameServices.contains(nsId)) { - throw new IOException("Unknown nameservice: " + nsId); - } - } - } - Map> addressList = - DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, - defaultAddress, - DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, - DFS_NAMENODE_RPC_ADDRESS_KEY); + DFSUtilClient.getAddressesForNsIdsInternal(conf, parentNameServices, + defaultAddress, + type, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, + DFS_NAMENODE_RPC_ADDRESS_KEY); if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: namenode address " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " @@ -571,6 +549,66 @@ public static Map> getNNServiceRpcAddress return addressList; } + /** + * Returns list of InetSocketAddresses corresponding to the namenodes (excluding + * observer namenodes) that manages this cluster. + * + * Returns namenode addresses, excluding those for observer namenodes, specifically + * configured for datanodes (using service ports), if found. If not, regular RPC + * address configured for other clients is returned. + * + * @param conf configuration + * @return list of InetSocketAddress + * @throws IOException on error + */ + public static Map> + getNNServiceRpcAddressesForClusterExcludingObservers(Configuration conf) throws IOException { + return getNNServiceRpcAddressesForClusterInternal( + conf, NameNodeType.NAMENODE); + } + + /** + * Returns list of InetSocketAddresses for observer namenodes. + * + * Returns observer namenodes' addresses specifically configured for datanodes (using + * service ports), if found. If not, regular RPC address configured for other + * clients is returned. + * + * @param conf configuration + * @return list of InetSocketAddress + * @throws IOException on error + */ + public static Map> + getObserverNNServiceRpcAddressesForCluster(Configuration conf) throws IOException { + return getNNServiceRpcAddressesForClusterInternal( + conf, NameNodeType.OBSERVER); + } + + /** + * Returns list of InetSocketAddresses corresponding to both the ordinary namenodes + * and observer namenodes. + * + * Returns namenode addresses specifically configured for datanodes (using + * service ports), if found. If not, regular RPC address configured for other + * clients is returned. + * + * @param conf configuration + * @return list of InetSocketAddress + * @throws IOException on error + */ + public static Map> + getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException { + return getNNServiceRpcAddressesForClusterInternal( + conf, NameNodeType.ALL); + } + + private static Map> + getNNServiceRpcAddressesForClusterInternal( + Configuration conf, NameNodeType type) throws IOException { + Collection parentNameServices = getNameServiceIdsForCluster(conf); + return getNNServiceRpcAddressesInternal(conf, parentNameServices, type); + } + /** * Returns list of InetSocketAddresses corresponding to lifeline RPC servers * at namenodes from the configuration. @@ -582,27 +620,37 @@ public static Map> getNNServiceRpcAddress public static Map> getNNLifelineRpcAddressesForCluster(Configuration conf) throws IOException { + Collection parentNameServices = getNameServiceIdsForCluster(conf); + return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null, + DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY); + } - Collection parentNameServices = conf.getTrimmedStringCollection( - DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); - - if (parentNameServices.isEmpty()) { - parentNameServices = conf.getTrimmedStringCollection( + /** + * Returns a collection of nameservices for this cluster. + * + * @param conf configuration + * @return a collection of nameservices + * @throws IOException on error + */ + private static Collection getNameServiceIdsForCluster(Configuration conf) + throws IOException { + Collection nameServices = conf.getTrimmedStringCollection + (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); + if (nameServices.isEmpty()) { + nameServices = conf.getTrimmedStringCollection( DFSConfigKeys.DFS_NAMESERVICES); } else { // Ensure that the internal service is indeed in the list of all available // nameservices. Set availableNameServices = Sets.newHashSet(conf .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES)); - for (String nsId : parentNameServices) { + for (String nsId : nameServices) { if (!availableNameServices.contains(nsId)) { throw new IOException("Unknown nameservice: " + nsId); } } } - - return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null, - DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY); + return nameServices; } /** @@ -1089,13 +1137,12 @@ static String[] getSuffixIDs(final Configuration conf, final String addressKey, String nameserviceId = null; String namenodeId = null; int found = 0; - Collection nsIds = DFSUtilClient.getNameServiceIds(conf); for (String nsId : DFSUtilClient.emptyAsSingletonNull(nsIds)) { if (knownNsId != null && !knownNsId.equals(nsId)) { continue; } - + Collection nnIds = DFSUtilClient.getNameNodeIds(conf, nsId); for (String nnId : DFSUtilClient.emptyAsSingletonNull(nnIds)) { if (LOG.isTraceEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 6242e037e4251..63936576ba8d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -146,18 +146,38 @@ public static String getNameNodeIdFromAddress(final Configuration conf, } /** - * Get the NN IDs of all other nodes in an HA setup. + * Get the NN IDs of all other nodes in an HA setup, not including observer NNs. * * @param conf the configuration of this node + * @param nsId the nameservice id * @return the NN IDs of all other nodes in this nameservice */ public static List getNameNodeIdOfOtherNodes(Configuration conf, String nsId) { + return getNameNodeIdOfOtherNodes(conf, nsId, false); + } + + /** + * Get the NN IDs of all other nodes in an HA setup. + * + * @param conf the configuration of this node + * @param nsId the nameservice id + * @param includeObservers whether to include observer NNs in the result + * @return the NN IDs of all other nodes in this nameservice + */ + public static List getNameNodeIdOfOtherNodes( + Configuration conf, String nsId, boolean includeObservers) { Preconditions.checkArgument(nsId != null, "Could not determine namespace id. Please ensure that this " + "machine is one of the machines listed as a NN RPC address, " + "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID); - - Collection nnIds = DFSUtilClient.getNameNodeIds(conf, nsId); + + Collection nnIds; + if (includeObservers) { + nnIds = DFSUtilClient.getNameNodeIds(conf, nsId); + } else { + nnIds = DFSUtilClient.getNameNodeIdsExcludingObservers(conf, nsId); + } + String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); Preconditions.checkArgument(nnIds != null, "Could not determine namenode ids in namespace '%s'. " + @@ -196,19 +216,31 @@ public static Configuration getConfForOtherNode(Configuration myConf) { return otherConfs.get(0); } + /** + * Given the configuration for this node, return a list of configurations for + * all the other nodes (not including observers) in an HA setup. + * + * @param myConf the configuration of this node + * @return the configuration of all other nodes in an HA setup + */ + public static List getConfForOtherNodes( + Configuration myConf) { + return getConfForOtherNodes(myConf, false); + } /** * Given the configuration for this node, return a Configuration object for * all the other nodes in an HA setup. * * @param myConf the configuration of this node + * @param includeObservers whether to include observer NNs in the result * @return the configuration of all other nodes in an HA setup */ public static List getConfForOtherNodes( - Configuration myConf) { + Configuration myConf, boolean includeObservers) { String nsId = DFSUtil.getNamenodeNameServiceId(myConf); - List otherNn = getNameNodeIdOfOtherNodes(myConf, nsId); + List otherNn = getNameNodeIdOfOtherNodes(myConf, nsId, includeObservers); // Look up the address of the other NNs List confs = new ArrayList(otherNn.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 6a9f21547e622..6542af97e4602 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -158,7 +158,9 @@ enum StartupOption{ // only used for StorageDirectory.analyzeStorage() in hot swap drive scenario. // TODO refactor StorageDirectory.analyzeStorage() so that we can do away with // this in StartupOption. - HOTSWAP("-hotswap"); + HOTSWAP("-hotswap"), + // Startup the namenode in observer mode. + OBSERVER("-observer"); private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile( "(\\w+)\\((\\w+)\\)"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 7d3dc99ca0fba..6a57fdc38a654 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -27,6 +27,7 @@ import java.util.EnumMap; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -109,13 +110,27 @@ @InterfaceStability.Evolving public class FSEditLogLoader { static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName()); - static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec + + /** Number of edits to load at each iteration */ + // TODO: make this configurable? + private static final long REPLAY_BATCH_SIZE = 10000; + + /** + * The interval between each edits loading. This allows observer + * to process read requests in between. + * */ + private static final long REPLAY_BATCH_INTERVAL = 100; // 100 ms private final FSNamesystem fsNamesys; private long lastAppliedTxId; /** Total number of end transactions loaded. */ private int totalEdits = 0; - + + private static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec + + /** The last time a replay progress is logged */ + private long lastLogTime = 0; + public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { this.fsNamesys = fsNamesys; this.lastAppliedTxId = lastAppliedTxId; @@ -136,19 +151,17 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); - fsNamesys.writeLock(); try { long startTime = monotonicNow(); FSImage.LOG.info("Start loading edits file " + edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); - FSImage.LOG.info("Edits file " + edits.getName() - + " of size " + edits.length() + " edits # " + numEdits + FSImage.LOG.info("Edits file " + edits.getName() + + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); return numEdits; } finally { edits.close(); - fsNamesys.writeUnlock("loadFSEdits"); prog.endStep(Phase.LOADING_EDITS, step); } } @@ -156,6 +169,42 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, long loadEditRecords(EditLogInputStream in, boolean closeOnExit, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { + long lastTxId = in.getLastTxId(); + long numTxns = (lastTxId - expectedStartingTxId) + 1; + long expectedTxId = expectedStartingTxId; + StartupProgress prog = NameNode.getStartupProgress(); + Step step = createStartupProgressStep(in); + prog.setTotal(Phase.LOADING_EDITS, step, numTxns); + Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); + + long totalNumEdits = 0; + lastLogTime = monotonicNow(); + try { + FSImage.LOG.info("Start loading edit log in a batch size of " + REPLAY_BATCH_SIZE); + while (true) { + LoadResult p = loadEditRecords(in, expectedStartingTxId, + expectedTxId, startOpt, recovery, REPLAY_BATCH_SIZE, counter, numTxns); + totalNumEdits += p.numEdits; + expectedTxId = p.expectedTxId; + if (!p.hasNext) { + break; + } + TimeUnit.MILLISECONDS.sleep(REPLAY_BATCH_INTERVAL); + } + } catch (InterruptedException e) { + throw new IOException("Interrupted between applying edit log ops", e); + } finally { + if(closeOnExit) { + in.close(); + } + } + return totalNumEdits; + } + + private LoadResult loadEditRecords(EditLogInputStream in, + long startTxId, long expectedStartingTxId, StartupOption startOpt, + MetaRecoveryContext recovery, long batchSize, Counter counter, + long numTxns) throws IOException { FSDirectory fsDir = fsNamesys.dir; EnumMap> opCounts = @@ -173,22 +222,17 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, long expectedTxId = expectedStartingTxId; long numEdits = 0; - long lastTxId = in.getLastTxId(); - long numTxns = (lastTxId - expectedStartingTxId) + 1; - StartupProgress prog = NameNode.getStartupProgress(); - Step step = createStartupProgressStep(in); - prog.setTotal(Phase.LOADING_EDITS, step, numTxns); - Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); - long lastLogTime = monotonicNow(); long lastInodeId = fsNamesys.dir.getLastInodeId(); - + boolean hasMore = true; + try { - while (true) { + while (numEdits < batchSize) { try { FSEditLogOp op; try { op = in.readOp(); if (op == null) { + hasMore = false; break; } } catch (Throwable e) { @@ -250,7 +294,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, } // Now that the operation has been successfully decoded and // applied, update our bookkeeping. - incrOpCount(op.opCode, opCounts, step, counter); + incrOpCount(op.opCode, opCounts, counter); if (op.hasTransactionId()) { lastAppliedTxId = op.getTransactionId(); expectedTxId = lastAppliedTxId + 1; @@ -261,7 +305,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, if (op.hasTransactionId()) { long now = monotonicNow(); if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { - long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1; + long deltaTxId = lastAppliedTxId - startTxId + 1; int percent = Math.round((float) deltaTxId / numTxns * 100); LOG.info("replaying edit log: " + deltaTxId + "/" + numTxns + " transactions completed. (" + percent + "%)"); @@ -272,18 +316,17 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, totalEdits++; } catch (RollingUpgradeOp.RollbackException e) { LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback."); + hasMore = false; break; } catch (MetaRecoveryContext.RequestStopException e) { MetaRecoveryContext.LOG.warn("Stopped reading edit log at " + in.getPosition() + "/" + in.length()); + hasMore = false; break; } } } finally { fsNamesys.dir.resetLastInodeId(lastInodeId); - if(closeOnExit) { - in.close(); - } fsDir.writeUnlock(); fsNamesys.writeUnlock("loadEditRecords"); @@ -295,9 +338,21 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, dumpOpCounts(opCounts); } } - return numEdits; + return new LoadResult(numEdits, expectedTxId, hasMore); } - + + private static class LoadResult { + final long numEdits; + final long expectedTxId; + final boolean hasNext; + + LoadResult(long numEdits, long expectedTxId, boolean hasNext) { + this.numEdits = numEdits; + this.expectedTxId = expectedTxId; + this.hasNext = hasNext; + } + } + // allocate and update last allocated inode id private long getAndUpdateLastInodeId(long inodeIdFromOp, int logVersion, long lastInodeId) throws IOException { @@ -1093,8 +1148,7 @@ private static void dumpOpCounts( } private void incrOpCount(FSEditLogOpCodes opCode, - EnumMap> opCounts, Step step, - Counter counter) { + EnumMap> opCounts, Counter counter) { Holder holder = opCounts.get(opCode); if (holder == null) { holder = new Holder(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 217209a40ab0e..e55b5f7fc33cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -519,7 +519,7 @@ private void logAuditEvent(boolean succeeded, private final ReentrantLock cpLock; /** - * Used when this NN is in standby state to read from the shared edit log. + * Used when this NN is in standby or observer state to read from the shared edit log. */ private EditLogTailer editLogTailer = null; @@ -1324,7 +1324,6 @@ void startStandbyServices(final Configuration conf) throws IOException { // During startup, we're already open for read. getFSImage().editLog.initSharedJournalsForRead(); } - blockManager.setPostponeBlocksFromFuture(true); // Disable quota checks while in standby. @@ -1337,6 +1336,24 @@ void startStandbyServices(final Configuration conf) throws IOException { } } + /** + * Start services required in observer state + * + * @throws IOException + */ + void startObserverState(final Configuration conf) throws IOException { + LOG.info("Starting services required for observer state"); + if (!getFSImage().editLog.isOpenForRead()) { + // During startup, we're already open for read. + getFSImage().editLog.initSharedJournalsForRead(); + } + + blockManager.setPostponeBlocksFromFuture(true); + dir.disableQuotaChecks(); + editLogTailer = new EditLogTailer(this, conf); + editLogTailer.start(); + } + /** * Called when the NN is in Standby state and the editlog tailer tails the * OP_ROLLING_UPGRADE_START. @@ -1373,7 +1390,18 @@ void stopStandbyServices() throws IOException { getFSImage().editLog.close(); } } - + + /** Stop services required in observer state */ + void stopObserverServices() throws IOException { + LOG.info("Stopping services started for observer state"); + if (editLogTailer != null) { + editLogTailer.stop(); + } + if (dir != null && getFSImage() != null && getFSImage().editLog != null) { + getFSImage().editLog.close(); + } + } + @Override public void checkOperation(OperationCategory op) throws StandbyException { if (haContext != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index fd72eb65fb175..9957fb72f8088 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverState; import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; @@ -266,6 +267,7 @@ public static enum OperationCategory { private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" + + StartupOption.OBSERVER.getName() + "] | \n\t[" + StartupOption.CHECKPOINT.getName() + "] | \n\t[" + StartupOption.FORMAT.getName() + " [" + StartupOption.CLUSTERID.getName() + " cid ] [" @@ -329,6 +331,7 @@ public long getProtocolVersion(String protocol, LoggerFactory.getLogger("BlockStateChange"); public static final HAState ACTIVE_STATE = new ActiveState(); public static final HAState STANDBY_STATE = new StandbyState(); + public static final HAState OBSERVER_STATE = new ObserverState(); private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace."; @@ -931,9 +934,11 @@ private void stopAtException(Exception e){ } protected HAState createHAState(StartupOption startOpt) { - if (!haEnabled || startOpt == StartupOption.UPGRADE + if (!haEnabled || startOpt == StartupOption.UPGRADE || startOpt == StartupOption.UPGRADEONLY) { return ACTIVE_STATE; + } else if (startOpt == StartupOption.OBSERVER) { + return OBSERVER_STATE; } else { return STANDBY_STATE; } @@ -1396,6 +1401,8 @@ static StartupOption parseArguments(String args[]) { startOpt = StartupOption.BACKUP; } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) { startOpt = StartupOption.CHECKPOINT; + } else if (StartupOption.OBSERVER.getName().equalsIgnoreCase(cmd)) { + startOpt = StartupOption.OBSERVER; } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) { startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ? @@ -1720,6 +1727,9 @@ synchronized void transitionToActive() if (!haEnabled) { throw new ServiceFailedException("HA for namenode is not enabled"); } + if (state == OBSERVER_STATE) { + throw new ServiceFailedException("Cannot transition from Observer to Active"); + } state.setState(haContext, ACTIVE_STATE); } @@ -1729,6 +1739,9 @@ synchronized void transitionToStandby() if (!haEnabled) { throw new ServiceFailedException("HA for namenode is not enabled"); } + if (state == STANDBY_STATE) { + throw new ServiceFailedException("Cannot transition from Observer to Standby"); + } state.setState(haContext, STANDBY_STATE); } @@ -1786,12 +1799,11 @@ public String getNNRole() { @Override // NameNodeStatusMXBean public String getState() { - String servStateStr = ""; - HAServiceState servState = getServiceState(); - if (null != servState) { - servStateStr = servState.toString(); + String stateStr = ""; + if (state != null) { + stateStr = state.toString(); } - return servStateStr; + return stateStr; } @Override // NameNodeStatusMXBean @@ -1880,6 +1892,26 @@ public void startStandbyServices() throws IOException { } } + @Override + public void startObserverServices() throws IOException { + try { + namesystem.startObserverState(conf); + } catch (Throwable t) { + doImmediateShutdown(t); + } + } + + @Override + public void stopObserverServices() throws IOException { + try { + if (namesystem != null) { + namesystem.stopObserverServices(); + } + } catch (Throwable t) { + doImmediateShutdown(t); + } + } + @Override public void prepareToStopStandbyServices() throws ServiceFailedException { try { @@ -1929,11 +1961,15 @@ public boolean allowStaleReads() { public boolean isStandbyState() { return (state.equals(STANDBY_STATE)); } - + public boolean isActiveState() { return (state.equals(ACTIVE_STATE)); } + public boolean isObserverState() { + return state.equals(OBSERVER_STATE); + } + /** * Returns whether the NameNode is completely started */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 84041fa9ba90e..da8d80886b951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1445,6 +1445,7 @@ public Boolean call() throws IOException { if (nn.getFSImage().isUpgradeFinalized() && !namesystem.isRollingUpgrade() && !nn.isStandbyState() && + !nn.isObserverState() && noStaleStorages) { return new FinalizeCommand(poolId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index a763e66f946cd..a6431195ff647 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -23,7 +23,6 @@ import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Collection; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,17 +100,28 @@ public class EditLogTailer { * available to be read from. */ private final long sleepTimeMs; - + + /** + * Whether this is for an observer namenode + */ + private final boolean isObserver; + public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; this.namesystem = namesystem; this.editLog = namesystem.getEditLog(); - + lastLoadTimeMs = monotonicNow(); + isObserver = "observer".equals(namesystem.getHAState()); - logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, - DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; + if (isObserver) { + // Observer NN doesn't roll edit log + logRollPeriodMs = -1; + } else { + logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, + DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; + } if (logRollPeriodMs >= 0) { this.activeAddr = getActiveNodeAddress(); Preconditions.checkArgument(activeAddr.getPort() > 0, @@ -203,7 +213,10 @@ void doTailEdits() throws IOException, InterruptedException { // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will // deadlock. - namesystem.writeLockInterruptibly(); + if (!isObserver) { + // Observer doesn't need lock since it never transition to active/standby + namesystem.writeLockInterruptibly(); + } try { FSImage image = namesystem.getFSImage(); @@ -248,7 +261,9 @@ void doTailEdits() throws IOException, InterruptedException { } lastLoadedTxnId = image.getLastAppliedTxId(); } finally { - namesystem.writeUnlock(); + if (!isObserver) { + namesystem.writeUnlock(); + } } } @@ -271,6 +286,7 @@ private boolean tooLongSinceLastLoad() { * Trigger the active node to roll its logs. */ private void triggerActiveLogRoll() { + Preconditions.checkArgument(!isObserver, "Should not trigger roll from observer NN"); LOG.info("Triggering log roll on remote NameNode " + activeAddr); try { getActiveNodeProxy().rollEditLog(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java index 823738798d64a..2b0a65ffcbc71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java @@ -45,6 +45,12 @@ public interface HAContext { /** Start the services required in standby state */ public void startStandbyServices() throws IOException; + /** Start the services required in observer state */ + public void startObserverServices() throws IOException; + + /** Stop the services when exiting observer state */ + public void stopObserverServices() throws IOException; + /** Prepare to exit the standby state */ public void prepareToStopStandbyServices() throws ServiceFailedException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverState.java new file mode 100644 index 0000000000000..1fa5115a397c2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverState.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.StandbyException; + +import java.io.IOException; + +/** + * Observer state of the namenode. In this state, the namenode service only + * handle operations of type {@link org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory#READ}. + * It also doesn't participate in either checkpointing or failover. + */ +@InterfaceAudience.Private +public class ObserverState extends StandbyState { + @Override + public void enterState(HAContext context) throws ServiceFailedException { + try { + context.startObserverServices(); + } catch (IOException e) { + throw new ServiceFailedException("Failed to start observer services", e); + } + } + + @Override + public void exitState(HAContext context) throws ServiceFailedException { + try { + context.stopObserverServices(); + } catch (IOException e) { + throw new ServiceFailedException("Failed to stop observer services", e); + } + } + + @Override + public void checkOperation(HAContext context, NameNode.OperationCategory op) throws StandbyException { + if (op == NameNode.OperationCategory.UNCHECKED || op == NameNode.OperationCategory.READ) { + return; + } + String faq = ". Visit https://s.apache.org/sbnn-error"; + String msg = "Operation category " + op + " is not supported in state " + + context.getState() + faq; + throw new StandbyException(msg); + } + + @Override + public String toString() { + return "observer"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java index 9a51190b17607..1ae9fb71a1fcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java @@ -48,7 +48,7 @@ public static List getRemoteNameNodes(Configuration conf, St if (nsId == null) { return Collections.emptyList(); } - List otherNodes = HAUtil.getConfForOtherNodes(conf); + List otherNodes = HAUtil.getConfForOtherNodes(conf, true); List nns = new ArrayList(); for (Configuration otherNode : otherNodes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java index e6cf16c89dfc5..e512a60401ea0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java @@ -67,6 +67,8 @@ enum Command { NAMENODE("-namenodes", "gets list of namenodes in the cluster."), SECONDARY("-secondaryNameNodes", "gets list of secondary namenodes in the cluster."), + OBSERVER("-observers", + "get list of observer namenodes in the cluster"), BACKUP("-backupNodes", "gets list of backup nodes in the cluster."), INCLUDE_FILE("-includeFile", "gets the include file path that defines the datanodes " + @@ -82,6 +84,8 @@ enum Command { map = new HashMap(); map.put(StringUtils.toLowerCase(NAMENODE.getName()), new NameNodesCommandHandler()); + map.put(StringUtils.toLowerCase(OBSERVER.getName()), + new ObserverNameNodesCommandHandler()); map.put(StringUtils.toLowerCase(SECONDARY.getName()), new SecondaryNameNodesCommandHandler()); map.put(StringUtils.toLowerCase(BACKUP.getName()), @@ -187,11 +191,22 @@ int doWorkInternal(GetConf tool, String[] args) throws Exception { static class NameNodesCommandHandler extends CommandHandler { @Override int doWorkInternal(GetConf tool, String []args) throws IOException { - tool.printMap(DFSUtil.getNNServiceRpcAddressesForCluster(tool.getConf())); + tool.printMap(DFSUtil.getNNServiceRpcAddressesForClusterExcludingObservers(tool.getConf())); return 0; } } - + + /** + * Handler for {@link Command#OBSERVER} + */ + static class ObserverNameNodesCommandHandler extends CommandHandler { + @Override + public int doWorkInternal(GetConf tool, String []args) throws IOException { + tool.printMap(DFSUtil.getObserverNNServiceRpcAddressesForCluster(tool.getConf())); + return 0; + } + } + /** * Handler for {@link Command#BACKUP} */ @@ -213,7 +228,7 @@ public int doWorkInternal(GetConf tool, String []args) throws IOException { return 0; } } - + /** * Handler for {@link Command#NNRPCADDRESSES} * If rpc addresses are defined in configuration, we return them. Otherwise, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 9b1d09017ff4d..8d1146896a716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1359,6 +1359,16 @@ + + dfs.client.enable.stale-read + false + + Enable stale reads from observer namenodes. To use this, the client + must also set failover proxy provider to + org.apache.hadoop.hdfs.server.namenode.ha.StaleReadProxyProvider. + + + dfs.client.failover.max.attempts 15 @@ -1464,6 +1474,17 @@ + + dfs.ha.observer.namenodes.EXAMPLENAMESERVICE + + + The prefix for a given nameservice, contains a comma-separated + list of observer namenodes for a given nameservice (eg EXAMPLENAMESERVICE). + Note that all the observer namenodes must also appear in + dfs.ha.namenodes.EXAMPLENAMESERVICE. + + + dfs.ha.namenode.id diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index eeae8999d2c3c..1efe58a208f96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -138,6 +138,7 @@ Runs the HDFS filesystem checking utility. See [fsck](./HdfsUserGuide.html#fsck) Usage: hdfs getconf -namenodes + hdfs getconf -observers hdfs getconf -secondaryNameNodes hdfs getconf -backupNodes hdfs getconf -includeFile @@ -148,6 +149,7 @@ Usage: | COMMAND\_OPTION | Description | |:---- |:---- | | `-namenodes` | gets list of namenodes in the cluster. | +| `-observers` | gets list of observer namenodes in the cluster. | | `-secondaryNameNodes` | gets list of secondary namenodes in the cluster. | | `-backupNodes` | gets list of backup nodes in the cluster. | | `-includeFile` | gets the include file path that defines the datanodes that can join the cluster. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index adcf6399e1063..ec711d3cb9c68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -46,6 +46,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import java.io.File; @@ -105,7 +106,6 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -459,7 +459,9 @@ protected MiniDFSCluster(Builder builder) throws IOException { assert builder.storageTypes == null || builder.storageTypes.length == builder.numDataNodes; final int numNameNodes = builder.nnTopology.countNameNodes(); + final int numObservers = builder.nnTopology.countObservers(); LOG.info("starting cluster: numNameNodes=" + numNameNodes + + ", numObservers=" + numObservers + ", numDataNodes=" + builder.numDataNodes); this.storagesPerDatanode = builder.storagesPerDatanode; @@ -941,7 +943,7 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, * Do the rest of the NN configuration for things like shared edits, * as well as directory formatting, etc. for a single nameservice * @param nnCounter the count of the number of namenodes already configured/started. Also, - * acts as the index to the next NN to start (since indicies start at 0). + * acts as the index to the next NN to start (since indices start at 0). * @throws IOException */ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCounter, @@ -962,10 +964,13 @@ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCo FileUtil.fullyDelete(new File(sharedEditsUri)); } + // create a list containing both ordinary and observer NNs. + List allNNs = nameservice.getNNs(); + // Now format first NN and copy the storage directory from that node to the others. int nnIndex = nnCounter; Collection prevNNDirs = null; - for (NNConf nn : nameservice.getNNs()) { + for (NNConf nn : allNNs) { initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs, manageNameDfsDirs, nnIndex); Collection namespaceDirs = FSNamesystem.getNamespaceDirs(conf); @@ -1019,7 +1024,9 @@ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCo for (NNConf nn : nameservice.getNNs()) { initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs, enableManagedDfsDirsRedundancy, nnIndex++); - NameNodeInfo info = createNameNode(conf, false, operation, + // Observer NN always use StartupOption.OBSERVER as startup configuration. + StartupOption so = nn.getIsObserver() ? StartupOption.OBSERVER : operation; + NameNodeInfo info = createNameNode(conf, false, so, clusterId, nsId, nn.getNnId()); // Record the last namenode uri @@ -1028,6 +1035,7 @@ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCo info.conf.get(FS_DEFAULT_NAME_KEY); } } + if (!federation && lastDefaultFileSystem != null) { // Set the default file system to the actual bind address of NN. conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem); @@ -1078,10 +1086,11 @@ public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean fede // need to have - have to do this a priori before starting // *any* of the NNs, so they know to come up in standby. List nnIds = Lists.newArrayList(); + List obIds = Lists.newArrayList(); // Iterate over the NNs in this nameservice for (NNConf nn : nameservice.getNNs()) { nnIds.add(nn.getNnId()); - + if (nn.getIsObserver()) obIds.add(nn.getNnId()); initNameNodeAddress(conf, nameservice.getId(), nn); } @@ -1091,6 +1100,8 @@ public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean fede conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner .on(",").join(nnIds)); } + conf.set(DFSUtil.addKeySuffixes(DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX, nameservice.getId()), + Joiner.on(",").join(obIds)); } } @@ -1103,9 +1114,34 @@ public static URI formatSharedEditsDir(File baseDir, int minNN, int maxNN) return fileAsURI(new File(baseDir, "shared-edits-" + minNN + "-through-" + maxNN)); } - + + /** + * Return namenodes for all nameservices configured. Note this doesn't + * include observer namenodes. + * TODO: should we include observer namenodes? + */ public NameNodeInfo[] getNameNodeInfos() { - return this.namenodes.values().toArray(new NameNodeInfo[0]); + return filterNameNodeInfos(this.namenodes.values()); + } + + /** + * @param nsIndex index of the namespace id to check + * @return all the observer namenodes bound to the given namespace index + */ + public NameNodeInfo[] getObserverNameNodeInfos(int nsIndex) { + int i = 0; + for (String ns : this.namenodes.keys()) { + if (i++ == nsIndex) { + List result = new ArrayList<>(); + for (NameNodeInfo nn : this.namenodes.get(ns)) { + if (nn.startOpt == StartupOption.OBSERVER) { + result.add(nn); + } + } + return result.toArray(new NameNodeInfo[0]); + } + } + return null; } /** @@ -1116,7 +1152,7 @@ public NameNodeInfo[] getNameNodeInfos(int nsIndex) { int i = 0; for (String ns : this.namenodes.keys()) { if (i++ == nsIndex) { - return this.namenodes.get(ns).toArray(new NameNodeInfo[0]); + return filterNameNodeInfos(this.namenodes.get(ns)); } } return null; @@ -1129,12 +1165,24 @@ public NameNodeInfo[] getNameNodeInfos(int nsIndex) { public NameNodeInfo[] getNameNodeInfos(String nameservice) { for (String ns : this.namenodes.keys()) { if (nameservice.equals(ns)) { - return this.namenodes.get(ns).toArray(new NameNodeInfo[0]); + return filterNameNodeInfos(this.namenodes.get(ns)); } } return null; } + /** + * Filter out all observer namenodes from 'namenodeInfos'. + */ + private NameNodeInfo[] filterNameNodeInfos(Collection namenodeInfos) { + List result = new ArrayList<>(); + for (NameNodeInfo nn : namenodeInfos) { + if (nn.startOpt != StartupOption.OBSERVER) { + result.add(nn); + } + } + return result.toArray(new NameNodeInfo[0]); + } private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId, boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex) @@ -1650,7 +1698,6 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, } this.numDataNodes += numDataNodes; waitActive(); - setDataNodeStorageCapacities( curDatanodesNum, @@ -3074,11 +3121,27 @@ private void checkSingleNameNode() { /** * Add a namenode to a federated cluster and start it. Configuration of * datanodes in the cluster is refreshed to register with the new namenode. - * - * @return newly started namenode */ public void addNameNode(Configuration conf, int namenodePort) throws IOException { + addNameNode(conf, namenodePort, false); + } + + /** + * Add a observer namenode to a federated cluster and start it. Configuration of + * datanodes in the cluster is refreshed to register with the new namenode. + */ + public void addObserver(Configuration conf, int namenodePort) + throws IOException { + addNameNode(conf, namenodePort, true); + } + + /** + * Add a namenode to a federated cluster and start it. Configuration of + * datanodes in the cluster is refreshed to register with the new namenode. + */ + public void addNameNode(Configuration conf, int namenodePort, boolean observer) + throws IOException { if(!federation) throw new IOException("cannot add namenode to non-federated cluster"); @@ -3095,8 +3158,9 @@ public void addNameNode(Configuration conf, int namenodePort) // figure out the current number of NNs NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId); int nnIndex = infos == null ? 0 : infos.length; + StartupOption operation = observer ? StartupOption.OBSERVER : null; initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex); - NameNodeInfo info = createNameNode(conf, true, null, null, nameserviceId, nnId); + createNameNode(conf, true, operation, null, nameserviceId, nnId); // Refresh datanodes with the newly started namenode for (DataNodeProperties dn : dataNodes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java index b9786a32a7516..1787ee2ccf40c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -50,13 +51,12 @@ public static MiniDFSNNTopology simpleSingleNN( .setHttpPort(nameNodeHttpPort) .setIpcPort(nameNodePort))); } - /** * Set up an HA topology with a single HA nameservice. */ public static MiniDFSNNTopology simpleHATopology() { - return simpleHATopology(2); + return simpleHATopology(2, 0); } /** @@ -64,12 +64,23 @@ public static MiniDFSNNTopology simpleHATopology() { * @param nnCount of namenodes to use with the nameservice */ public static MiniDFSNNTopology simpleHATopology(int nnCount) { + return simpleHATopology(nnCount, 0); + } + + /** + * Set up an HA topology with a single HA nameservice. + * @param nnCount of namenodes to use with the nameservice + * @param obCount of observers to use with the nameservice + */ + public static MiniDFSNNTopology simpleHATopology(int nnCount, int obCount) { MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("minidfs-ns"); for (int i = 1; i <= nnCount; i++) { nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i)); } - MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice); - return topology; + for (int i = 1; i <= obCount; i++) { + nameservice.addNN(new MiniDFSNNTopology.NNConf("ob" + i)); + } + return new MiniDFSNNTopology().addNameservice(nameservice); } /** @@ -131,12 +142,31 @@ public MiniDFSNNTopology addNameservice(NSConf nameservice) { } public int countNameNodes() { + return countInternal(false); + } + + public int countObservers() { + return countInternal(true); + } + + private int countInternal(boolean observer) { int count = 0; for (NSConf ns : nameservices) { - count += ns.nns.size(); + for (NNConf nn : ns.getNNs()) { + if (observer == nn.getIsObserver()) { + count++; + } + } } return count; } + + /** + * Count all namenodes, including both ordinary and observer namenodes + */ + public int countAllNameNodes() { + return countNameNodes() + countObservers(); + } public NNConf getOnlyNameNode() { Preconditions.checkState(countNameNodes() == 1, @@ -198,7 +228,7 @@ public List getNameservices() { public static class NSConf { private final String id; private final List nns = Lists.newArrayList(); - + public NSConf(String id) { this.id = id; } @@ -222,6 +252,7 @@ public static class NNConf { private int httpPort; private int ipcPort; private String clusterId; + private boolean isObserver; public NNConf(String nnId) { this.nnId = nnId; @@ -243,6 +274,10 @@ String getClusterId() { return clusterId; } + boolean getIsObserver() { + return isObserver; + } + public NNConf setHttpPort(int httpPort) { this.httpPort = httpPort; return this; @@ -257,6 +292,11 @@ public NNConf setClusterId(String clusterId) { this.clusterId = clusterId; return this; } + + public NNConf setIsObserver() { + this.isObserver = true; + return this; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index d27cfb60ec608..d2a2a412e5479 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -82,6 +82,7 @@ import com.google.common.collect.Sets; +// TODO: add more tests for observer namenodes. public class TestDFSUtil { static final String NS1_NN_ADDR = "ns1-nn.example.com:9820"; @@ -1019,13 +1020,13 @@ public void testGetNNServiceRpcAddressesForNsIds() throws IOException { } Map> nnMap = DFSUtil - .getNNServiceRpcAddressesForCluster(conf); + .getNNServiceRpcAddressesForClusterExcludingObservers(conf); assertEquals(1, nnMap.size()); assertTrue(nnMap.containsKey("nn1")); conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3"); try { - DFSUtil.getNNServiceRpcAddressesForCluster(conf); + DFSUtil.getNNServiceRpcAddressesForClusterExcludingObservers(conf); fail("Should fail for misconfiguration"); } catch (IOException ignored) { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java index ecfc39abebca5..e283c44b363cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java @@ -47,6 +47,7 @@ public static class Builder { private final Configuration conf; private StartupOption startOpt = null; private int numNNs = 2; + private int numObs = 0; private final MiniDFSCluster.Builder dfsBuilder; public Builder(Configuration conf) { @@ -72,6 +73,25 @@ public Builder setNumNameNodes(int nns) { this.numNNs = nns; return this; } + + public Builder setNumObservers(int obs) { + this.numObs = obs; + return this; + } + } + + public static MiniDFSNNTopology createDefaultTopology(int nns, int obs, int startingPort) { + MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE); + for (int i = 0; i < nns; i++) { + nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setIpcPort(startingPort++) + .setHttpPort(startingPort++)); + } + for (int i = 0; i < obs; i++) { + nameservice.addNN(new MiniDFSNNTopology.NNConf("ob" + (i + nns)).setIpcPort(startingPort++) + .setHttpPort(startingPort++).setIsObserver()); + } + + return new MiniDFSNNTopology().addNameservice(nameservice); } public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) { @@ -104,7 +124,7 @@ private MiniQJMHACluster(Builder builder) throws IOException { URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE); // start cluster with specified NameNodes - MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, basePort); + MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, builder.numObs, basePort); initHAConf(journalURI, builder.conf, basePort, builder.numNNs); @@ -151,7 +171,7 @@ private Configuration initHAConf(URI journalURI, Configuration conf, int basePor } // use standard failover configurations - HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns); + HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns.size(), nns); return conf; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index f048718be92b8..6d8ee9a4a5d48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX; import java.io.IOException; import java.net.InetSocketAddress; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -169,7 +171,7 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName) { setFailoverConfigurations(cluster, conf, logicalName, 0); } - + /** Sets the required configurations for performing failover. */ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex) { @@ -178,12 +180,31 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster, for (MiniDFSCluster.NameNodeInfo nn : nns) { nnAddresses.add(nn.nameNode.getNameNodeAddress()); } - setFailoverConfigurations(conf, logicalName, nnAddresses); + int nnSize = nnAddresses.size(); + setFailoverConfigurations(conf, logicalName, nnSize, nnAddresses); + } + + /** Sets the required configurations for performing failover. */ + public static void setFailoverConfigurations(MiniDFSCluster cluster, + Configuration conf, String logicalName, int nsIndex, boolean includeObservers) { + MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); + List nnAddresses = new ArrayList(3); + for (MiniDFSCluster.NameNodeInfo nn : nns) { + nnAddresses.add(nn.nameNode.getNameNodeAddress()); + } + int nnSize = nnAddresses.size(); + if (includeObservers) { + MiniDFSCluster.NameNodeInfo[] obs = cluster.getObserverNameNodeInfos(nsIndex); + for (MiniDFSCluster.NameNodeInfo ob : obs) { + nnAddresses.add(ob.nameNode.getNameNodeAddress()); + } + } + setFailoverConfigurations(conf, logicalName, nnSize, nnAddresses); } public static void setFailoverConfigurations(Configuration conf, String logicalName, InetSocketAddress ... nnAddresses){ - setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses)); + setFailoverConfigurations(conf, logicalName, nnAddresses.length, Arrays.asList(nnAddresses)); } /** @@ -191,7 +212,15 @@ public static void setFailoverConfigurations(Configuration conf, String logicalN */ public static void setFailoverConfigurations(Configuration conf, String logicalName, List nnAddresses) { - setFailoverConfigurations(conf, logicalName, + setFailoverConfigurations(conf, logicalName, nnAddresses.size(), nnAddresses); + } + + /** + * Sets the required configurations for performing failover + */ + public static void setFailoverConfigurations(Configuration conf, + String logicalName, int nnSize, List nnAddresses) { + setFailoverConfigurations(conf, logicalName, nnSize, Iterables.transform(nnAddresses, new Function() { // transform the inet address to a simple string @@ -202,24 +231,62 @@ public String apply(InetSocketAddress addr) { })); } - public static void setFailoverConfigurations(Configuration conf, String logicalName, - Iterable nnAddresses) { + /** + * Set configuration 'conf' for failover. 'nnSize' is the number of + * active/standby NNs in the cluster, while 'nnAddresses' contains + * active/standby/observer NNs. + * + * Precondition: 'nnAddresses.size()' >= 'nnSize' + */ + public static void setFailoverConfigurations(Configuration conf, + String logicalName, int nnSize, Iterable nnAddresses) { List nnids = new ArrayList(); + List obids = new ArrayList<>(); int i = 0; for (String address : nnAddresses) { String nnId = "nn" + (i + 1); nnids.add(nnId); + if (i >= nnSize) { + obids.add(nnId); + } conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address); i++; } conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName), Joiner.on(',').join(nnids)); + if (obids.size() != 0) { + conf.set(DFSUtil.addKeySuffixes(DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX, logicalName), + Joiner.on(',').join(obids)); + } conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, ConfiguredFailoverProxyProvider.class.getName()); conf.set("fs.defaultFS", "hdfs://" + logicalName); } + public static DistributedFileSystem configureStaleReadFs( + MiniDFSCluster cluster, Configuration conf, + int nsIndex) throws IOException, URISyntaxException { + conf = new Configuration(conf); + String logicalName = getLogicalHostname(cluster); + setFailoverConfigurations(cluster, conf, logicalName, nsIndex, true); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, + StaleReadProxyProvider.class.getName()); + FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); + return (DistributedFileSystem) fs; + } + + public static HdfsAdmin getStaleReadAdmin( + MiniDFSCluster cluster, Configuration conf, + int nsIndex) throws IOException, URISyntaxException { + conf = new Configuration(conf); + String logicalName = getLogicalHostname(cluster); + setFailoverConfigurations(cluster, conf, logicalName, nsIndex, true); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, + StaleReadProxyProvider.class.getName()); + return new HdfsAdmin(new URI("hdfs://" + logicalName), conf); + } + public static String getLogicalHostname(MiniDFSCluster cluster) { return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNameNode.java new file mode 100644 index 0000000000000..e8e38749376ef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNameNode.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryInvocationHandler; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.net.URI; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestObserverNameNode { + private Configuration conf; + private MiniQJMHACluster qjmhaCluster; + private MiniDFSCluster dfsCluster; + private NameNode[] namenodes; + private Path testPath; + private Path testPath2; + private Path testPath3; + + /** These are set in each individual test case */ + private DistributedFileSystem dfs; + private RetryInvocationHandler handler; + private StaleReadProxyProvider provider; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + // Disable stale read first, and enable it in some of the individual tests + conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_ENABLE_STALE_READ, false); + + setUpCluster(1); + + testPath = new Path("/test"); + testPath2 = new Path("/test2"); + testPath3 = new Path("/test3"); + } + + @After + public void tearDown() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test + public void testSimpleRead() throws Exception { + setStaleRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + try { + dfs.getFileStatus(testPath); + fail("Should throw FileNotFoundException"); + } catch (FileNotFoundException e) { + // Continue + } + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + assertSentTo(0); + } + + @Test + public void testFailOver() throws Exception { + setStaleRead(false); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + dfs.getFileStatus(testPath); + assertSentTo(0); + + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToActive(1); + dfsCluster.waitActive(); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + assertSentTo(1); + dfs.getFileStatus(testPath); + assertSentTo(1); + } + + @Test + public void testDoubleFailOver() throws Exception { + setStaleRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + dfs.mkdir(testPath2, FsPermission.getDefault()); + assertSentTo(0); + + dfsCluster.transitionToStandby(0); + dfsCluster.transitionToActive(1); + dfsCluster.waitActive(1); + + rollEditLogAndTail(1); + dfs.getFileStatus(testPath2); + assertSentTo(2); + dfs.mkdir(testPath3, FsPermission.getDefault()); + assertSentTo(1); + + dfsCluster.transitionToStandby(1); + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath3); + assertSentTo(2); + dfs.delete(testPath3, false); + assertSentTo(0); + } + + @Test + public void testObserverShutdown() throws Exception { + setStaleRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + // Shutdown the observer - requests should go to active + dfsCluster.shutdownNameNode(2); + dfs.getFileStatus(testPath); + assertSentTo(0); + + // Start the observer again - requests should go to observer + dfsCluster.restartNameNode(2); + dfs.getFileStatus(testPath); + assertSentTo(2); + } + + @Test + public void testMultiObserver() throws Exception { + setUpCluster(2); + setStaleRead(true); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentToOne(2, 3); + + dfs.mkdir(testPath2, FsPermission.getDefault()); + rollEditLogAndTail(0); + + // Shutdown first observer, request should go to the second one + dfsCluster.shutdownNameNode(2); + dfs.listStatus(testPath2); + assertSentTo(3); + + // Restart the first observer + dfsCluster.restartNameNode(2); + dfs.listStatus(testPath); + assertSentToOne(2, 3); + + dfs.mkdir(testPath3, FsPermission.getDefault()); + rollEditLogAndTail(0); + + // Now shutdown the second observer, request should go to the first one + dfsCluster.shutdownNameNode(3); + dfs.listStatus(testPath3); + assertSentTo(2); + + // Shutdown both, request should go to active + dfsCluster.shutdownNameNode(2); + dfs.listStatus(testPath3); + assertSentTo(0); + } + + @Test + public void testTransition() throws Exception { + try { + dfsCluster.transitionToActive(2); + fail("Should get exception"); + } catch (ServiceFailedException e) { + // pass + } + + try { + dfsCluster.transitionToStandby(2); + fail("Should get exception"); + } catch (ServiceFailedException e) { + // pass + } + } + + @Test + public void testBootstrap() throws Exception { + for (URI u : dfsCluster.getNameDirs(2)) { + File dir = new File(u.getPath()); + assertTrue(FileUtil.fullyDelete(dir)); + } + int rc = BootstrapStandby.run( + new String[]{"-nonInteractive"}, + dfsCluster.getConfiguration(2) + ); + assertEquals(0, rc); + } + + private void setUpCluster(int numObservers) throws Exception { + qjmhaCluster = new MiniQJMHACluster.Builder(conf) + .setNumNameNodes(2) + .setNumObservers(numObservers) + .build(); + dfsCluster = qjmhaCluster.getDfsCluster(); + + namenodes = new NameNode[2 + numObservers]; + for (int i = 0; i < namenodes.length; i++) { + namenodes[i] = dfsCluster.getNameNode(i); + } + + dfsCluster.transitionToActive(0); + dfsCluster.waitActive(0); + } + + private void assertSentTo(int nnIdx) throws IOException { + FailoverProxyProvider.ProxyInfo pi = provider.getLastProxy(); + assertEquals(pi.proxyInfo, getNameNode(nnIdx).getNameNodeAddress().toString()); + } + + private void assertSentToOne(int... nnIndices) throws IOException { + FailoverProxyProvider.ProxyInfo pi = provider.getLastProxy(); + int i = 0; + for(int nnIdx : nnIndices) { + if(pi.proxyInfo.equals(getNameNode(nnIdx).getNameNodeAddress().toString())) { + i++; + } + } + assertEquals(1, i); + } + + private void setStaleRead(boolean enableStaleRead) throws Exception { + conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_ENABLE_STALE_READ, enableStaleRead); + dfs = HATestUtil.configureStaleReadFs(dfsCluster, conf, 0); + handler = (RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode()); + provider = (StaleReadProxyProvider) handler.getProxyProvider(); + } + + private void rollEditLogAndTail(int indexForActiveNN) throws Exception { + getNameNode(indexForActiveNN).getRpcServer().rollEditLog(); + for (int i = 2; i < namenodes.length; i++) { + getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + } + } + + private NameNode getNameNode(int idx) { + return dfsCluster.getNameNode(idx); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java index 94ce6b22fc7d7..00a72c1a49b03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hdfs.tools; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -40,6 +42,7 @@ import java.util.Map; import java.util.StringTokenizer; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,9 +63,10 @@ */ public class TestGetConf { enum TestType { - NAMENODE, BACKUP, SECONDARY, NNRPCADDRESSES + NAMENODE, OBSERVER, BACKUP, SECONDARY, NNRPCADDRESSES } - FileSystem localFileSys; + FileSystem localFileSys; + /** Setup federation nameServiceIds in the configuration */ private void setupNameServices(HdfsConfiguration conf, int nameServiceIdCount) { StringBuilder nsList = new StringBuilder(); @@ -75,6 +79,29 @@ private void setupNameServices(HdfsConfiguration conf, int nameServiceIdCount) { conf.set(DFS_NAMESERVICES, nsList.toString()); } + /** + * Setup namenodes specified by 'nnIds' for 'conf', for all + * 'nameServiceIdCount' nameservices. + * If 'onlyObserver' is true, this will use 'nnIds' as observer namenodes. + * Otherwise, it will use 'nnIds' as both ordinary and observer namenodes. + */ + private void setupNameNodes(HdfsConfiguration conf, int nameServiceIdCount, + boolean onlyObserver, int[] nnIds) { + for (int c = 0; c < nameServiceIdCount; c++) { + StringBuilder nnList = new StringBuilder(); + String confKey = onlyObserver ? DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX + : DFS_HA_NAMENODES_KEY_PREFIX; + confKey = DFSUtil.addKeySuffixes(confKey, getNameServiceId(c)); + for (int i = 0; i < nnIds.length; i++) { + if (i > 0) { + nnList.append(","); + } + nnList.append(getNameNodeId(c, nnIds[i])); + } + conf.set(confKey, nnList.toString()); + } + } + /** Set a given key with value as address, for all the nameServiceIds. * @param conf configuration to set the addresses in * @param key configuration key @@ -83,13 +110,17 @@ private void setupNameServices(HdfsConfiguration conf, int nameServiceIdCount) { * @return list of addresses that are set in the configuration */ private String[] setupAddress(HdfsConfiguration conf, String key, - int nameServiceIdCount, int portOffset) { - String[] values = new String[nameServiceIdCount]; - for (int i = 0; i < nameServiceIdCount; i++, portOffset++) { - String nsID = getNameServiceId(i); - String specificKey = DFSUtil.addKeySuffixes(key, nsID); - values[i] = "nn" + i + ":" + portOffset; - conf.set(specificKey, values[i]); + int nameServiceIdCount, int portOffset, int[] nnIds) { + String[] values = new String[nameServiceIdCount * nnIds.length]; + int count = 0; + for (int i = 0; i < nameServiceIdCount; i++) { + for (int n = 0; n < nnIds.length; n++) { + String nsID = getNameServiceId(i); + String nnID = getNameNodeId(i, nnIds[n]); + String specificKey = DFSUtil.addKeySuffixes(key, nsID, nnID); + values[count] = nnID + ":" + portOffset++; + conf.set(specificKey, values[count++]); + } } return values; } @@ -98,9 +129,11 @@ private String[] setupAddress(HdfsConfiguration conf, String key, * Add namenodes to the static resolution list to avoid going * through DNS which can be really slow in some configurations. */ - private void setupStaticHostResolution(int nameServiceIdCount) { + private void setupStaticHostResolution(int nameServiceIdCount, int[] nnIds) { for (int i = 0; i < nameServiceIdCount; i++) { - NetUtils.addStaticResolution("nn" + i, "localhost"); + for (int n = 0; n < nnIds.length; n++) { + NetUtils.addStaticResolution(getNameNodeId(i, n), "localhost"); + } } } @@ -123,7 +156,9 @@ private Map> getAddressListFromConf( TestType type, HdfsConfiguration conf) throws IOException { switch (type) { case NAMENODE: - return DFSUtil.getNNServiceRpcAddressesForCluster(conf); + return DFSUtil.getNNServiceRpcAddressesForClusterExcludingObservers(conf); + case OBSERVER: + return DFSUtil.getObserverNNServiceRpcAddressesForCluster(conf); case BACKUP: return DFSUtil.getBackupNodeAddresses(conf); case SECONDARY: @@ -166,6 +201,9 @@ private String getAddressListFromTool(TestType type, HdfsConfiguration conf, case NAMENODE: args[0] = Command.NAMENODE.getName(); break; + case OBSERVER: + args[0] = Command.OBSERVER.getName(); + break; case BACKUP: args[0] = Command.BACKUP.getName(); break; @@ -238,6 +276,10 @@ private static String getNameServiceId(int index) { return "ns" + index; } + private static String getNameNodeId(int nsIndex, int nnIndex) { + return getNameServiceId(nsIndex) + "nn" + nnIndex; + } + /** * Test empty configuration */ @@ -246,7 +288,8 @@ public void testEmptyConf() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(false); // Verify getting addresses fails getAddressListFromTool(TestType.NAMENODE, conf, false); - System.out.println(getAddressListFromTool(TestType.BACKUP, conf, false)); + getAddressListFromTool(TestType.OBSERVER, conf, false); + getAddressListFromTool(TestType.BACKUP, conf, false); getAddressListFromTool(TestType.SECONDARY, conf, false); getAddressListFromTool(TestType.NNRPCADDRESSES, conf, false); for (Command cmd : Command.values()) { @@ -283,8 +326,9 @@ public void testNonFederation() throws Exception { // Returned namenode address should match default address conf.set(FS_DEFAULT_NAME_KEY, "hdfs://localhost:1000"); verifyAddresses(conf, TestType.NAMENODE, false, "localhost:1000"); + verifyAddresses(conf, TestType.OBSERVER, false, "localhost:1000"); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:1000"); - + // Returned address should match backupnode RPC address conf.set(DFS_NAMENODE_BACKUP_ADDRESS_KEY,"localhost:1001"); verifyAddresses(conf, TestType.BACKUP, false, "localhost:1001"); @@ -320,35 +364,66 @@ public void testFederation() throws Exception { // returned from federation configuration. Returned namenode addresses are // based on service RPC address and not regular RPC address setupNameServices(conf, nsCount); - String[] nnAddresses = setupAddress(conf, - DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000); - setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500); - setupStaticHostResolution(nsCount); + int[] namenodes = { 0, 1 }; + int[] observers = { 2, 3 }; + int[] allNameNodes = new int[namenodes.length + observers.length]; + System.arraycopy(namenodes, 0, allNameNodes, 0, 2); + System.arraycopy(observers, 0, allNameNodes, 2, 2); + + setupNameNodes(conf, nsCount, false, allNameNodes); + setupNameNodes(conf, nsCount, true, observers); + setupStaticHostResolution(nsCount, allNameNodes); + String[] namenodeAddresses = setupAddress(conf, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000, namenodes); + String[] observerAddresses = setupAddress(conf, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1100, observers); + String[] allNameNodeAddresses = new String[namenodeAddresses.length + observerAddresses.length]; + System.arraycopy(namenodeAddresses, 0, + allNameNodeAddresses, 0, namenodeAddresses.length); + System.arraycopy(observerAddresses, 0, + allNameNodeAddresses, namenodeAddresses.length, observerAddresses.length); + + setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500, allNameNodes); String[] backupAddresses = setupAddress(conf, - DFS_NAMENODE_BACKUP_ADDRESS_KEY, nsCount, 2000); + DFS_NAMENODE_BACKUP_ADDRESS_KEY, nsCount, 2000, namenodes); String[] secondaryAddresses = setupAddress(conf, - DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, nsCount, 3000); - verifyAddresses(conf, TestType.NAMENODE, false, nnAddresses); + DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, nsCount, 3000, namenodes); + + verifyAddresses(conf, TestType.NAMENODE, false, namenodeAddresses); + verifyAddresses(conf, TestType.OBSERVER, false, observerAddresses); verifyAddresses(conf, TestType.BACKUP, false, backupAddresses); verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses); - verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses); - + verifyAddresses(conf, TestType.NNRPCADDRESSES, true, allNameNodeAddresses); + // Test to ensure namenode, backup, secondary namenode addresses and - // namenode rpc addresses are returned from federation configuration. + // namenode rpc addresses are returned from federation configuration. // Returned namenode addresses are based on regular RPC address // in the absence of service RPC address. conf = new HdfsConfiguration(false); setupNameServices(conf, nsCount); - nnAddresses = setupAddress(conf, - DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1000); + setupNameNodes(conf, nsCount, false, allNameNodes); + setupNameNodes(conf, nsCount, true, observers); + namenodeAddresses = setupAddress(conf, + DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1000, namenodes); + observerAddresses = setupAddress(conf, + DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1100, observers); + System.arraycopy(namenodeAddresses, 0, + allNameNodeAddresses, 0, namenodeAddresses.length); + System.arraycopy(observerAddresses, 0, + allNameNodeAddresses, namenodeAddresses.length, observerAddresses.length); + backupAddresses = setupAddress(conf, - DFS_NAMENODE_BACKUP_ADDRESS_KEY, nsCount, 2000); + DFS_NAMENODE_BACKUP_ADDRESS_KEY, nsCount, 2000, namenodes); secondaryAddresses = setupAddress(conf, - DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, nsCount, 3000); - verifyAddresses(conf, TestType.NAMENODE, false, nnAddresses); + DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, nsCount, 3000, namenodes); + + verifyAddresses(conf, TestType.NAMENODE, false, namenodeAddresses); + + + verifyAddresses(conf, TestType.OBSERVER, false, observerAddresses); verifyAddresses(conf, TestType.BACKUP, false, backupAddresses); verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses); - verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses); + verifyAddresses(conf, TestType.NNRPCADDRESSES, true, allNameNodeAddresses); } @Test(timeout=10000) @@ -386,6 +461,7 @@ public void testTool() throws Exception { } } } + @Test public void TestGetConfExcludeCommand() throws Exception{ HdfsConfiguration conf = new HdfsConfiguration(); @@ -431,15 +507,16 @@ public void TestGetConfIncludeCommand() throws Exception{ @Test public void testIncludeInternalNameServices() throws Exception { final int nsCount = 10; - final int remoteNsCount = 4; HdfsConfiguration conf = new HdfsConfiguration(); setupNameServices(conf, nsCount); - setupAddress(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000); - setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500); + int[] namenodes = { 0 , 1 }; + setupNameNodes(conf, nsCount, false, namenodes); + setupStaticHostResolution(nsCount, namenodes); + setupAddress(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000, namenodes); + setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500, namenodes); conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "ns1"); - setupStaticHostResolution(nsCount); - String[] includedNN = new String[] {"nn1:1001"}; + String[] includedNN = new String[] { "ns1nn0:1002", "ns1nn1:1003" }; verifyAddresses(conf, TestType.NAMENODE, false, includedNN); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, includedNN); }