-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REP] Ray on spark autoscaling #43
[REP] Ray on spark autoscaling #43
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jjyao --- LGTM, no high level concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will continue later.
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
How do we ensure head node's availability? does spark terminate worker node? |
We don't. This is out of autoscaling scope, but it is scope of head node HA.
Yes. We launch / terminate Ray worker node by starting / canceling spark job. Internally, it uses a script to start Ray worker node as subprocess and the script will monitoring parent process died event, if its parent process dies, then it kills ray worker node process and all subprocesses of ray worker node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty good to me. Just some questions for my own learning.
Integrate autoscaling feature into existing `ray.util.spark.cluster_init.setup_ray_cluster` API, | ||
The following new arguments are added: | ||
|
||
- autoscale: bool (default False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever would want to start an autoscaling cluster with some non-zero worker counts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my test, launching a Ray worker node is very fast, so I think starting with zero worker node should fulfill most use-cases. It is also the have the optimal resource utilization. If user requires min worker node number > 0, we can support it in future.
task to hold this Ray worker node, and we set a unique spark job group ID to | ||
this spark job. When we need to terminate this Ray worker node, we cancel | ||
the corresponding spark job by canceling corresponding spark job group, so that | ||
the spark job and its spark task are killed, then it triggers the Ray worker node termination. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when a ray worker node is terminated - how does that work? Does it kill the raylet? or through calling things like ray stop
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #43 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can see this script: https://github.com/WeichenXu123/ray/blob/autoscale-prototyping/python/ray/util/spark/start_ray_node.py
We launch Ray worker node by:
- launch a spark job with only one spark task
- the spark launches the ray worker node by executing
start_ray_node.py
script - when we need to kill ray worker node, we kill the spark job, then the spark task is killed, then in
start_ray_node.py
script, it has acheck_parent_alive
thread, once it detects parent process (i.e. spark task process) dies, then it triggers killing ray worker routine (including kill all processes related to ray worker nodes, and then clean temp directory if this is the last killed ray worker node on this spark worker node, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, thanks!
we have to have one spark job for each Ray worker node. | ||
|
||
One thing critical is that spark node provider runs in autoscaler process that is different process | ||
than the one that executes "setup_ray_cluster" API. User calls "setup_ray_cluster" in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to run this spark application in the autoscaler node provider process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For databricks user, we can't,
because when we create a databricks notebook, once the databricks notebook is attached to a spark cluster, then spark session is created in the notebook REPL, and user have to call setup_ray_cluster
API in the notebook REPL, and Ray autoscaler have to reuse the existing spark session.
|
||
#### We can use `ray up` command to set up a ray cluster with autoscaling, why we don't call `ray up` command in ray on spark autoscaling implementation ? | ||
|
||
In ray on spark, we only provides a python API `setup_ray_cluster` and it does not have a CLI. So in `setup_ray_cluster` implementation, we need to generate autoscale config YAML file according to `setup_ray_cluster` argument values, and then launch the ray head node with "--autoscaling-config" option. In this way ray on spark code can manage ray head node process and ray worker nodes easier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with these 2 constraints, it is, however, still possible to support ray up
with some sort of config checks and extra works at the config generation routine in ray up right?
Is there gonna be a long term plan to eventually support ray up
cli?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can consider migrating code to use ray up
in future, if there are strong reasons and benefits.
|
||
#### How to make `NodeProvider` backend support multiple Ray worker nodes running on the same virtual machine ? | ||
|
||
By default, `NodeProvider` implementation implement `internal_ip` and `external_ip` methods and convert `node_id` to IP, and different node must have different IP address, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this is something we are factoring away. Likely in ray 2.9 / 2.10 by end of this year.
The changed semantic will be some instance id that could be defined by the node provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good!
|
||
#### Ray autoscaler supports setting multiple Ray worker groups, each Ray worker group has its individual CPU / GPU / memory resources configuration, and its own minumum / maximum worker number setting for autoscaling. Shall we support this feature in Ray on spark autoscaling? | ||
|
||
Current use-cases only require all Ray worker nodes having the same shape, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious what's the extra work or limitation like to support multiple node types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can support it, just adding some code in ray on spark code (the code generating autoscaler config file to generate multiple worker groups)
but we haven't received related requests from our databricks customers, so we don't implement the feature for now.
|
||
#### What's the default Ray on spark minimum worker number we should use ? | ||
|
||
I propose to set it to zero. Ray worker node launching is very quick, and setting it to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious how quick will this be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my test on databricks platform, it just cost several seconds to start a Ray worker node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha - i guess it's because it's just starting another process + some rpcs calls on the same node?
Merging since the vote has passed. Thanks @WeichenXu123 for the contribution |
REP: Ray on spark autoscaling