Skip to content

Commit

Permalink
HDFS observer namenode implementation
Browse files Browse the repository at this point in the history
This adds a new type of namenode: observer. A observer is like a standby
NN (in fact they share most of the code), EXCEPT it doesn't participate
in either NN failover (i.e., it is not part of the HA), or check pointing.

A observer can be specified through configuration. First, it needs to be
added into the config: dfs.ha.namenodes, just like a normal namenode,
together with other configs such as dfs.namenode.rpc-address,
dfs.namenode.http-address, etc. Second, it needs to be specified in a new
config: dfs.ha.observer.namenodes. This differentiate it from the ordinary
active/standby namenodes.

A observer can be used to serve read-only requests from HDFS client,
when the following two conditions are satisfied:

  1. the config dfs.client.failover.proxy.provider.<nameservice> is
     set to org.apache.hadoop.hdfs.server.namenode.ha.StaleReadProxyProvider.
  2. the config dfs.client.enable.stale-read is set to true

This also changes the way edit logs are loaded from the standby/observer NNs.
Instead of loading them all at once, the new implementation loads them
one batch at a time (default batch size is 10K edits) through multiple
iterations, while waiting for a short amount of time in between the
iterations (default waiting time is 100ms). This is to make sure the global
lock won't be held too long during loading edits. Otherwise, the RPC
processing time would suffer.

This patch does not include a mechanism for clients to specify the bound of
the staleness using journal transction ID: excluding this allows us to
deploy the observer more easily. In more specific, the deployment involves:

  1. restarting all datanodes with the updated configs. No binary change on
     datanodes is required.
  2. bootstraping and starting the observer namenode, with the updated
     configs. Existing namenodes do not need to change.

Future tasks:

  1. allow client to set a bound on staleness in observer in terms of time
     (e.g., 2min). If for some reason the lagging in edit tailing is larger
     than the bound, the client-side proxy provider will fail over all the
     RPCs to the active namenode.
  2. use journal transaction ID to ensure bound on staleness. This can be
     embedded in the RPC header.
  3. allow new standby/observer to be deployed without datanode restart.
  • Loading branch information
sunchao committed Dec 4, 2017
1 parent c7527fa commit ff29e13
Show file tree
Hide file tree
Showing 33 changed files with 1,578 additions and 187 deletions.
Expand Up @@ -435,4 +435,9 @@ public void close() throws IOException {
public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy());
}

@VisibleForTesting
public FailoverProxyProvider getProxyProvider() {
return proxyDescriptor.fpp;
}
}
Expand Up @@ -669,7 +669,8 @@ public RetryAction shouldRetry(Exception e, int retries,
e instanceof UnknownHostException ||
e instanceof StandbyException ||
e instanceof ConnectTimeoutException ||
isWrappedStandbyException(e)) {
isWrappedStandbyException(e) ||
isStandbyException(e)) {
return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
getFailoverOrRetrySleepTime(failovers));
} else if (e instanceof RetriableException
Expand Down Expand Up @@ -734,4 +735,18 @@ static RetriableException getWrappedRetriableException(Exception e) {
return unwrapped instanceof RetriableException ?
(RetriableException) unwrapped : null;
}

private static boolean isStandbyException(Exception ex) {
Throwable cause = ex.getCause();
if (cause != null) {
Throwable cause2 = cause.getCause();
if (cause2 instanceof RemoteException) {
RemoteException remoteException = (RemoteException)cause2;
IOException unwrapRemoteException =
remoteException.unwrapRemoteException();
return unwrapRemoteException instanceof StandbyException;
}
}
return false;
}
}
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;

import java.io.IOException;
Expand Down Expand Up @@ -128,18 +129,52 @@ public static Collection<String> getNameServiceIds(Configuration conf) {

/**
* Namenode HighAvailability related configuration.
* Returns collection of namenode Ids from the configuration. One logical id
* for each namenode in the in the HA setup.
* Returns collection of namenode Ids (including observer NNs) from the configuration.
* One logical id for each namenode in the HA setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of namenode Ids
* @param nsId the nameservice ID to look at, or null for
* non-federated
* @return collection of namenode Ids, including observer namenodes.
*/
public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
return conf.getTrimmedStringCollection(key);
}

/**
* Returns collection of observer namenode Ids from the configuration.
* One logical id for each observer in the HA setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of observer namenode Ids
*/
public static Collection<String> getObserverNameNodeIds(Configuration conf, String nsId) {
String key = addSuffix(DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX, nsId);
return conf.getTrimmedStringCollection(key);
}

/**
* Namenode HighAvailability related configuration.
* Returns collection of namenode Ids from the configuration, excluding observer namenodes.
* One logical id for each namenode in the HA setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of namenode Ids, excluding observer namenodes.
*/
public static Collection<String> getNameNodeIdsExcludingObservers(
Configuration conf, String nsId) {
Collection<String> allNNIds = getNameNodeIds(conf, nsId);
Collection<String> observerNNIds = getObserverNameNodeIds(conf, nsId);
if (!allNNIds.containsAll(observerNNIds)) {
throw new IllegalArgumentException("Observer NameNodes should be part of all NameNodes");
}
allNNIds.removeAll(observerNNIds);
return allNNIds;
}

/** Add non empty and non null suffix to a key */
static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
Expand All @@ -152,15 +187,30 @@ static String addSuffix(String key, String suffix) {

/**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
* the configuration. Note this does not include the NN RPC addresses from
* observer namenodes.
*
* @param conf configuration
* @return list of InetSocketAddresses
* @return list of InetSocketAddresse, not including those of observer NNs.
*/
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return DFSUtilClient.getAddresses(conf, null,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}

/**
* Returns list of InetSocketAddress corresponding to observer namenode RPC
* addresses from the configuration.
*
* @param conf configuration
* @return list of InetSocketAddresses for observer namenodes
*/
public static Map<String, Map<String, InetSocketAddress>> getObserverRpcAddresses(
Configuration conf) {
// Observer namenodes share the same RPC address key with ordinary namenodes
return DFSUtilClient.getObserverAddresses(conf, null,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}

/**
Expand Down Expand Up @@ -320,6 +370,15 @@ static String concatSuffixes(String... suffixes) {
return Joiner.on(".").skipNulls().join(suffixes);
}

/**
* Enum for selecting different types of namenodes (e.g., namenode, observers)
*/
enum NameNodeType {

This comment has been minimized.

Copy link
@xkrogen

xkrogen Dec 7, 2017

Can we instead leverage HAState / HAServiceState here?

This comment has been minimized.

Copy link
@sunchao

sunchao Dec 8, 2017

Author Owner

We might. But I think it is a bit odd to use the HAState or HAServiceState here. Also not sure how to represent the ALL case. Having an enum here is for convenience. It should be OK as long as we don't expose it to other classes.

NAMENODE,
OBSERVER,
ALL
}

/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
Expand All @@ -329,12 +388,34 @@ static String concatSuffixes(String... suffixes) {
*/
static Map<String, Map<String, InetSocketAddress>> getAddresses(
Configuration conf, String defaultAddress, String... keys) {
return getAddressesInternal(conf, defaultAddress, NameNodeType.NAMENODE, keys);
}

/**
* Returns the configured address for all Observer NameNodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
static Map<String, Map<String, InetSocketAddress>> getObserverAddresses(
Configuration conf, String defaultAddress, String... keys) {
return getAddressesInternal(conf, defaultAddress, NameNodeType.OBSERVER, keys);
}

/**
* Internal function to get configured addresses for a particular namenode
* type 'type'.
*/
static Map<String, Map<String, InetSocketAddress>> getAddressesInternal(
Configuration conf, String defaultAddress, NameNodeType type, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
return getAddressesForNsIdsInternal(conf, nameserviceIds, defaultAddress, type, keys);
}

/**
* Returns the configured address for all NameNodes in the cluster.
* Returns the configured address for both ordinary (active/standby) namenodes
* and observer namenodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
Expand All @@ -344,12 +425,23 @@ static Map<String, Map<String, InetSocketAddress>> getAddresses(
static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
Configuration conf, Collection<String> nsIds, String defaultAddress,
String... keys) {
return getAddressesForNsIdsInternal(
conf, nsIds, defaultAddress, NameNodeType.ALL, keys);
}

/**
* Internal function to get configured addresses for namenodes of type 'type' and
* nameservices with IDs 'nsIds'.
*/
static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIdsInternal(
Configuration conf, Collection<String> nsIds, String defaultAddress,
NameNodeType type, String... keys) {
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nsIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
getAddressesForNameserviceIdInternal(conf, nsId, defaultAddress, type, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
Expand All @@ -359,7 +451,27 @@ static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(

static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
return getAddressesForNameserviceIdInternal(
conf, nsId, defaultValue, NameNodeType.NAMENODE, keys);
}

private static Map<String, InetSocketAddress> getAddressesForNameserviceIdInternal(
Configuration conf, String nsId, String defaultValue,
NameNodeType type, String... keys) {
Collection<String> nnIds;
switch (type) {
case NAMENODE:
nnIds = getNameNodeIdsExcludingObservers(conf, nsId);
break;
case OBSERVER:
nnIds = getObserverNameNodeIds(conf, nsId);
break;
case ALL:
nnIds = getNameNodeIds(conf, nsId);
break;
default:
throw new IllegalArgumentException("Invalid namenode type: " + type.name());
}
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
Expand Down
Expand Up @@ -71,6 +71,7 @@ public interface HdfsClientConfigKeys {
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
String DFS_HA_OBSERVER_NAMENODES_KEY_PREFIX = "dfs.ha.observer.namenodes";
String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
boolean DFS_WEBHDFS_ENABLED_DEFAULT = true;
String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
Expand Down Expand Up @@ -117,6 +118,9 @@ public interface HdfsClientConfigKeys {
String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY =
"dfs.client.datanode-restart.timeout";
long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
String DFS_CLIENT_ENABLE_STALE_READ =
"dfs.client.enable.stale-read";
boolean DFS_CLIENT_ENABLE_STALE_READ_DEFAULT = false;
// Much code in hdfs is not yet updated to use these keys.
// the initial delay (unit is ms) for locateFollowingBlock, the delay time
// will increase exponentially(double) for each retry.
Expand Down

8 comments on commit ff29e13

@xkrogen
Copy link

@xkrogen xkrogen commented on ff29e13 Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great overall Chao, thanks for the effort. I reviewed the commit except for tests. I left inline comments and have a few high-level:

  • For the batch size of edit log transactions, it would probably be better to have it be time-based rather than number-based. Ultimately the administrator configuring this option is interested in bounding the latency increase caused by the write lock hold. I guess the current way is simpler and consistent with things like the batching of lock hold in getContentSummary but it seems more friendly to an administrator to use a time-based bound.
  • Right now there is just a global config to allow or disallow stale reads. This is probably fine for a first pass but ideally there would at least be a way to support (a) dynamically enabling/disabling, (b) initializing two FileSystem objects, one of which has the stale reads enabled and one of which has it disabled. This would give applications a lot more ability for control with small extra effort.
  • With multiple Standbys, normally everyone except the ANN creates checkpoints to make sure that they have a fairly recent fsimage persisted, and one SbNN uploads to the ANN. If the observer doesn't checkpoint, on restart it will be very out of date, probably enough so that edits don't go back that far, meaning an administrator would need to manually copy over a recent fsimage. I think this is something we need to consider further. Should standbys upload checkpoints to observers?
  • Biggest comment is that right now observers are treated very differently from the other NameNodes, e.g. an entirely separate configuration, you cannot transition between an observer and an active or standby, etc. I don't really see a reason to have this artificial restriction - why can't an observer node become an active or standby? I think ideally the Observer state would just be another point in the state machine of Active-Standby-Observer. This would require a little extra handling in the proxy to be able to dynamically determine which NameNodes are observers vs. standbys but seems more robust.

@sunchao
Copy link
Owner Author

@sunchao sunchao commented on ff29e13 Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xkrogen . These comments are super helpful!

For the batch size of edit log transactions, it would probably be better to have it be time-based rather than number-based. Ultimately the administrator configuring this option is interested in bounding the latency increase caused by the write lock hold. I guess the current way is simpler and consistent with things like the batching of lock hold in getContentSummary but it seems more friendly to an administrator to use a time-based bound.

I chose to use batch size simply because it is easier to implement, and I agree that time-based config may be more useful an administrator. Will think about how to implement that.

Right now there is just a global config to allow or disallow stale reads. This is probably fine for a first pass but ideally there would at least be a way to support (a) dynamically enabling/disabling, (b) initializing two FileSystem objects, one of which has the stale reads enabled and one of which has it disabled. This would give applications a lot more ability for control with small extra effort.

Yes this makes sense. The original patch in HDFS-10702 allows dynamic configuring. I changed it to make the integration easier with applications such as Hive & Presto. No code change is required on those. We can have a dynamic way to enable the stale read in future.

With multiple Standbys, normally everyone except the ANN creates checkpoints to make sure that they have a fairly recent fsimage persisted, and one SbNN uploads to the ANN. If the observer doesn't checkpoint, on restart it will be very out of date, probably enough so that edits don't go back that far, meaning an administrator would need to manually copy over a recent fsimage. I think this is something we need to consider further. Should standbys upload checkpoints to observers?

You're right. With current implementation, an administrator needs to bootstrap a observer to bring it up. Uploading checkpoints from standby is a good suggestion. Do you think it can be done after phase 0?

Biggest comment is that right now observers are treated very differently from the other NameNodes, e.g. an entirely separate configuration, you cannot transition between an observer and an active or standby, etc. I don't really see a reason to have this artificial restriction - why can't an observer node become an active or standby? I think ideally the Observer state would just be another point in the state machine of Active-Standby-Observer. This would require a little extra handling in the proxy to be able to dynamically determine which NameNodes are observers vs. standbys but seems more robust.

Hmm... the original intention was to exclude observer from failover. One benefit is, like I mentioned in one of the above comment, we might not need to use lock in EditLogTailer#doTailEdits. I'm open to suggestion.

Do you think, besides the lock issue, that it is safe to let observer to participate in failover? I'm open to suggestions. :) In fact, having the Active-Standby-Observer state transition may make the implementation simpler.

@xkrogen
Copy link

@xkrogen xkrogen commented on ff29e13 Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For the time-based config I think that's probably fine as a follow-on. The batch approach is much simpler for now and is probably 90% as good.
  • Same comment for the dynamic configuration of stale reads.
  • With the current code, I think having administrators being required to bootstrap would probably be fine for phase 0.

I think it is safe to let observer participate in failover, yeah. It should be maintaining all of the same state as a standby. This is a little tricky in combination with the last bullet above, though... Which nodes have up-to-date checkpoints will be dependent on transient state. It may be necessary to properly implement standbys uploading checkpoints to observers if we go down this route.

I agree that having the observer participate in the normal state transition should make implementation easier/cleaner overall and that is part of my motivation for it. It will put a little more complex logic into the proxy provider, but I think that is a good place to concentrate all of the changes.

I will look more at the locking needed to allow the observer to participate in failover. I suspect that we will need to add another lock, or perhaps we can reuse the cpLock, to make things work correctly. Need to study up on HDFS-2693.

@sunchao
Copy link
Owner Author

@sunchao sunchao commented on ff29e13 Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I'll work on revising the patch to allow observer to participate in failover, also addressing the above comments. Please let me know your findings on the locking issue.

@xkrogen
Copy link

@xkrogen xkrogen commented on ff29e13 Dec 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The locking is interesting. With the current behavior of not allowing failovers, things should be fine; there's some unprotected access to the lastAppliedTxId in FSImage, but since the edit log tailer is the only thing that will touch the txID, we should be fine.

However, once we let the observer participate in failover, we will need to worry about getting this correct. AFAICT the only reason we need synchronization over this whole thing is to make sure we get our lastAppliedTxId set correctly, don't re-apply a tx that has already been applied, etc. So it's all about getting the lastAppliedTxId consistent. Right now we only update the FSImage's lastAppliedTxId in the FSImage#loadEdits() layer, but there's no reason we couldn't pass a reference to FSImage down into FSEditLogLoader#loadEditRecords(). Basically we need to make sure the FSImage has the correct lastAppliedTxId value before releasing the lock. We'll need to add some additional logic to handle the concurrent failover case, like making the write lock acquisition interruptible, but it should all be doable. I can put together a patch for this and have you take a look.

@sunchao
Copy link
Owner Author

@sunchao sunchao commented on ff29e13 Dec 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I can take a look the patch after you have it. Meanwhile I'll also try to understand the lock issue on my side.

There are a few other issues that I want to mention:

  1. By "let the observer participate in failover", do you mean that a observer can directly transition to active? or it has to transition to standby first? We may want to avoid the case where the only observer transitioned to active and suddenly all read requests go to active.
  2. Since this work depends on HDFS-6440 and it is only available in 3.0, I wonder if you guys can help to push this into 2.x branches.

@xkrogen
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For apache#1, that's a very interesting point. For each client, as soon as they try to do a write or read from ANN, a client-side failover will be triggered and subsequent requests will go to the correct place. However clients which are only doing stale reads from standby could continue to perform those reads from the new ANN for some time. I think that it is worth it to allow such behavior but attempt to determine how bad the real world impact will be. Allowing only certain transitions makes things operationally more complex and is a somewhat arbitrary restriction so I would rather not go that way unless we see evidence that it is necessary.

For 2.x branches, yes, we would be interesting in porting it. But we can focus on trunk for now.

@sunchao
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For apache#1 we may not want unexpected result, e.g., after the failover, the active may no longer be available, or that it transitioned to a normal standby and thus unavailable for stale reading. If all read traffic suddenly go to active, it may have negative impact on its performance.

Please sign in to comment.