Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple YARN masters #595

Closed
kevin-bates opened this issue Mar 8, 2019 · 23 comments · Fixed by #607
Closed

Add support for multiple YARN masters #595

kevin-bates opened this issue Mar 8, 2019 · 23 comments · Fixed by #607

Comments

@kevin-bates
Copy link
Member

When YARN is configured for HA, it requires that multiple masters be specified. As a result, admins of EG should be able to specify multiple masters or none at all (in which case, the yarn-api-client library uses local Hadoop config files).

Currently EG requires a yarn_endpoint be configured --EnterpriseGatewayApp.yarn_endpoint=http://hostname:port/ws/v1/cluster, yet, now that the underlying library has been enhanced, EG only uses the hostname from that value. Instead, we should add a configuration option of --EnterpriseGatewayApp.yarn_masters which is a list-valued property, whose default value is the empty list.

If empty, the underlying library will use local configuration files. If single entry, EG will use that host. If multiples, we could either be optimistic and use the first one (with some recovery to the others) or we could ensure the one used is an active master via the ws/v1/cluster/info REST API. We should also retain the selected (or single) master in a static variable relative to the scope (remember that yarn hosts could appear in kernelspec configs) so that we start with that verified value for the next applicable kernel. (Unconditionally verifying the master when multiple exist, is probably the easiest and most stable approach.)

We should also try to switch from downed masters for already running kernels, otherwise things like termination, might fail - assuming the scope of that effort is not too great.

@saipradeepkumar
Copy link

@kevin-bates

as you said we can take the list of yarn endpoints.when ever we have to pick the yarn end point we can start connecting in the round robin fashion ,and pick the active from the list and assign it to the variable which we can use while getting the status.
for this we need two functions.

  1. selecting the active from RM from the list
    2)assigning the active yarn end point to the variable.

am i correct?

@kevin-bates
Copy link
Member Author

Yes. However, we may want to do this in the yarn-api-client layer since it already uses the /cluster endpoint for the case when no address is specified.

That said, I like the idea with dealing with the list in EG because, ultimately it would be nice not to have to rely on local hadoop config files (although spark might need those regardless). I'm not that familiar the spark configuration relative to YARN.

Adding @lresende and @akchinSTC in case they have input here.

@lresende
Copy link
Member

The code should not pick active resource manager, but actually discover which resource manager is the currently active one. See some potential ways to discover here and we should actually identify the one that is platform/distribution independent.

@kevin-bates
Copy link
Member Author

kevin-bates commented Mar 13, 2019

I believe @saipradeepkumar is saying the similar thing. The idea is that we'd discriminate against the active RMs via the haState. I referenced the cluster info REST details in the issue description above.

The link you provided is good. It would require additional knowledge other than the RM node(s). Not sure which is better. I like the idea of just adding the yarn_masters list-valued property additional RMs such that if you're in an HA cluster, you just then add to the same list. However, if you didn't add them all to the list, then EG could be in a position where none are haState=ACTIVE and it starves itself. So, I have to agree that adding additional - and separate - config values (that imply HA is configured - like cluster name) are probably necessary.

I do want to, for this exercise, change the yarn-endpoint config value to a single (or multiple) value that is just a host name.

@saipradeepkumar
Copy link

@lresende
@kevin-bates
when ha is enabled ,there are two way to switch the standby RM to active.
1)we can enable automatically
2)admin can do that.
when i reading the blogs how we can get the active RM.what it strike to my mind is hitting the zookeeper api which holds good in both cases as i mentioned above.
one thing we have to think is ,if we use the zookeeper api we are introducing the new dependency in to the package.

@kevin-bates
Copy link
Member Author

I'd prefer we not introduce another dependency when we can get the same information using the /cluster endpoint. The one use-case that (slightly) concerns me is that if an additional stand-by node is added, the admin needs to remember to update the EG configuration, but that's a significant update since I would imagine the cluster needs to come down to update yarn-site.xml (is that true?).

At any rate, I think the constructor for YarnClusterProcessProxy should determine the active RM when multiple YARN masters are configured. When it locates the active RM, that value should be placed into a global variable, such that the next instance creation uses that value to start its determination of the active master - then cycles to the next. Maintaining a simple array index (module length) should do the trick for the "last active master". If an active master cannot be determined (when multiple values are configured), we should throw an exception with an applicable message that no active master could be found. (For single-master configurations, no check is performed so the request will presumably fail downstream.)

I'd be happy to put something together, but could only test it in a contrived env (using a fake "down" server, etc.) with one working master. Let me know.

@kevin-bates
Copy link
Member Author

@saipradeepkumar, @lresende - I had some time and went ahead and implemented my thoughts on this. Let me know what you think. Once we settle on something, I'll update the docs to include the new option (and remove the old).

@saipradeepkumar
Copy link

@kevin-bates it sounds good for me ,we can go further and implement this feature

@hansohn
Copy link

hansohn commented Mar 20, 2019

The yarn-api-client package already has functionality to pull yarn resource manager id(s) from yarn-site.xml and determine active rm. In ver 0.3.3 you can use env var HADOOP_CONF_DIR to define where the yarn-site.xml is located. If you are using a Hadoop distro, chances are this env is already defined. Why not modify existing code to use functionality already defined?

you can find the reference functionality here

@kevin-bates
Copy link
Member Author

Thanks for the comment. That "cluster check" only occurs if no address is provided and doesn't check the haState, only that the response is received.

I tend to agree that this kind of thing should be in yarn-api-client, but felt an external approach may be more flexible. The other thing I want to prevent is pinging the standby master each time a new kernel is started. That is, we should have a means of remembering the last valid master across kernel instances.

@kevin-bates
Copy link
Member Author

@hansohn - you got me thinking (and looking into RM HA a bit) - thank you.

  1. RM HA is only a two-node thing, ACTIVE and STANDBY. You can't have multiple STANDBYs. As a result, it's probably not a huge deal to get stuck always checking the STANDBY if it's first in the list. Probably a couple hundred milliseconds amortized over the time it takes to start a kernel in a cluster (5-15 seconds).
  2. spark-submit must rely on either HADOOP_CONF_DIR or YARN_CONF_DIR, which, in the context of launching kernels could be different values than those accessible by EG/yarn-api-client. The use-case being that an EG admin has defined the kernel to hit a different YARN cluster than the default config.

I think you're correct in that we can get a long way here by NOT setting an address - although I feel the code in _check_is_active_rm() should check haState='ACTIVE', at least when called from the ResourceManager constructor (probably not necessary from get_webproxy_host_port() since its possible that it chooses a STANDBY RM.

Regarding the spark-submit point, this implies we would need the ability to pass in another (alternate) address. Rather than overload the address parameter on the ResourceManager constructor to handle it being a list, I'm inclined to add another address/port pair...

def __init__(self, address=None, port=8088, alt_address=None, alt_port=8088, timeout=30, kerberos_enabled=False):

Then refactor the constructor such that if two addresses are provided the active state is checked - either by promoting _check_is_active_rm() to a public method or wrapping it so that this case skips the config lookup code. (I'd lean toward its promotion.)

This way, the 90% case is handled by simple removal of the address (and EG should NOT provide defaults for yarn endpoint config values), while also allowing the case where the caller is targeting multiple yarn clusters.

I would also advocate for a new method on ResourceManager get_active_host_port() so that the caller can use that information for troubleshooting, information, etc.

Comments?

cc: @lresende, @saipradeepkumar

@lresende
Copy link
Member

Rather than overload the address parameter on the ResourceManager constructor to handle it being a list, I'm inclined to add another address/port pair...

def __init__(self, address=None, port=8088, alt_address=None, alt_port=8088, timeout=30, kerberos_enabled=False):

Regarding this particular design, I think it's not extensible. In the YARN APi Client project today, there is a PR to add http/https support, which following this pattern, will add address_use_https, alt_address_use_https, etc... Having a single parameter where you can pass one or a list of service endpoints like "http(s)://host:port" might be a good consideration and provide a little more flexibility and extensibility.

@hansohn
Copy link

hansohn commented Mar 20, 2019

@kevin-bates I manage a few yarn HA clusters and was originally writing a patch to account for HA because I didn't want to define a new value for EnterpriseGatewayApp.yarn_endpoint every time the resource manager failed over. Lots of things have happened since I originally went down this path and I haven't been able to complete the feature and submit a PR.

In regards to the points you mentioned above:

  1. Most Hadoop applications follow the same process of running though the yarn resource manager ids defined in yarn.resourcemanager.ha.rm-ids, translating the id to hostname, checking if address is available, and then failing over to next defend id if not. There is latency involved in this but its currently the standard way of doing things. Even Yarn follows this process internally.
  2. You're right that HADOOP_CONF_DIR or YARN_CONF_DIR can be provided to determine yarn-site.xml location. I don't typically see the YARN_CONF_DIR env var used much which is why I submitted the PR to yarn-api-client with the former. As far as yarn clusters go though, in my experience you wouldn't typically have multiple yarn clusters. You would have a single cluster with multiple queues that have different resources and permissions assigned. Even in the case where you have multiple dev/stg/prod clusters there are typically data boundaries between those environments and you end up with separate analytical environments as well.

The code in _check_is_active_rm() doesn't need to check if the cluster is HA or not because whether you have a single resource manager or many, rm-ids are defined the same way.

spark-submit handles yarn ha internally by translating the values of the rm-ids it finds in the yarn-site.xml when you define the HADOOP_CONF_DIR env var. So you shouldn't need to pass multiple addresses to it.

The biggest problem I have found with the above architecture is that the enterprise_gateway service will need to be bounced in order to trigger the call to yarn-api-client to get the active resource manager. It would be nice to adapt to a change like that at notebook runtime but I am not familiar enough with the codebase to know whether those sort of hooks are available or not.

Thank you for working on this feature! We use Yarn and Jupyter at my company and figuring out the HA implementation is a hurdle for us in order to be able to implement Enterprise Gateway into our workflow. Looking forward to the new features!

Cheers!

@kevin-bates
Copy link
Member Author

kevin-bates commented Mar 21, 2019

@lresende and @hansohn - thank you for the responses - they were helpful.

@lresende - I agree there's an extensibility issue here in terms of https coupled with the alternate addresses - although I think you'd only need a single use_https parameter - similar to kerberos_enabled since its highly unlikely you'd have one node configured for https and another not.

I choose the additional parameters approach because it's completely backward compatible and doesn't require updates to the other classes (for consistency purposes, since ResourceManager is the only class that should entertain multiple addresses). I think a switch to a fully qualified URI (sans the endpoint, although that should probably be part of it as well), will be disruptive and, not being a person close to yarn-api-client, don't really feel its in my place to make such a change. That said, I'd be happy to crank that stuff out, but I don't feel qualified to make that kind of decision.

@hansohn - Yes, the bounce of EG is required because we currently provide a default value for the yarn endpoint parameter. Once we remove the defaulting behaviors, no restart will be required because we get a new instance of RM for each kernel. We will do that at a minimum! Thanks for raising that.

Heck, if we feel we do not need to target multiple clusters at all, then we can get support for HA by subtraction! We simply REMOVE the endpoint configuration item - since we're required to have a CONF_DIR on the EG server in SPARK_HOME anyway. (@lresende - is that a true statement?) We'd just document that HADOOP_CONF_DIR be specified and never pass an address/port when creating an RM.

This would mean no changes to the ResourceManager constructor would be required as well.

The code in _check_is_active_rm() doesn't need to check if the cluster is HA or not because whether you have a single resource manager or many, rm-ids are defined the same way.

Hmm - I still think the explicit check is required when len(rm_ids) > 1, because if the first id in the list is the standby RM, a status of OK will be returned. Are you saying that the standby RM will have a 'Refresh' in the response header of non-None, while the active RM does not? If so, that's great!

@kevin-bates
Copy link
Member Author

Looks like this answers the response from the standby relative to Refresh in the header: http://mail-archives.apache.org/mod_mbox/ambari-user/201508.mbox/%3C09D941A9-4A4E-4153-A2D2-CCB57ACC332F@hortonworks.com%3E

@gss2002
Copy link

gss2002 commented Apr 11, 2019

So does Enterprise Gateway support Yarn HA out of the gate? or is it not supported an you have to hardcore the active RM?

@kevin-bates
Copy link
Member Author

That's the plan (and soon). For current releases, however, you have to hardcode the active RM.

With PR #623 (merged into master) you can achieve YARN HA support if EG is running on an edge node (i.e, HADOOP_CONF_DIR is available) and you set the EG_YARN_ENDPOINT to None. In this case, the yarn-api-client package will determine the active RM.

Once yarn-api-client PR #29 is merged and part of a release, we can then update the EG dependencies via PR #607 and can then set the addresses for the two RMs and yarn-api-client will determine the active RM. In this case, HADOOP_CONF_DIR is not required.

For both of these cases, we plan on updating BOTH the 1.x and 2.x releases.

@gss2002
Copy link

gss2002 commented Apr 12, 2019

Kevin is Toree still needed for the Scala support within Jupyter?

@kevin-bates
Copy link
Member Author

EG provides Scala kernelspecs that utilize Toree. You could probably use other Scala kernels for remote interaction via Toree, but you'd need to tweak our toree launcher or model a different launcher after that one.

If you're talking about regular Jupyter support, where kernels run local to the server, then there wouldn't be a requirement that Toree be the Scala kernel.

@lresende
Copy link
Member

If you're talking about regular Jupyter support, where kernels run local to the server, then there wouldn't be a requirement that Toree be the Scala kernel.

There should not be a difference if you are running locally or remotely, you will need a Scala kernel to support Scala / Spark and Toree is one of those kernels.

@kevin-bates
Copy link
Member Author

@lresende - when running remote, you'll need a kernel launcher to create the (local) connection file, send it back to EG and listen for interrupts. At this moment, the kernel launcher for Scala kernels is dedicated to Apache Toree. Is there some other way to run remote kernels outside of EG (i.e., regular Jupyter)?

@lresende
Copy link
Member

@kevin-bates I think I got a little confused from your previous paragraph, which now I seem to get it. Toree is a requirement in the context of EG but with vanilla Jupyter Notebook you can use any Scala kernel...

@kevin-bates
Copy link
Member Author

Correct - although technically speaking we (EG) support local kernels, so in that case, you could run EG with a different scala kernel provided the kernel only ran locally. As soon as you want it to run remotely, then you'll have a kernel launcher issue because the launcher we provide is dedicated to Toree. Sorry for the confusion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants