-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[State Observability] Basic functionality for centralized data #23744
Conversation
dashboard/modules/node/node_head.py
Outdated
@@ -319,6 +320,11 @@ def process_error(error_data): | |||
except Exception: | |||
logger.exception("Error receiving error info from GCS.") | |||
|
|||
@routes.get("/api/v0/nodes") | |||
async def get_nodes(self, req) -> aiohttp.web.Response: | |||
nodes = await self._dashboard_head.gcs_state_aggregator.get_nodes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's returned for node? Are we returning the node id as hex string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it returns hex string. I will add tests
This is ready for the initial review. I will add
The biggest question is how to avoid duplicated implementation + is the current way of generating schema the right approach (I assumed we cannot use Pydantic & need to stick to aiohttp implementation for now)? |
def __init__(self, dashboard_head): | ||
super().__init__(dashboard_head) | ||
|
||
@routes.get("/api/v0/placement_groups") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally, I was thinking to create a state_head.py
file and add all routes in there, but it seems like the current codebase makes it difficult to do so. It is because it assumes each of module cannot communicate each other directly through the interface, and only data is shared. This means if we want to use some of data from the job module, it is not possible within the state module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at least combine all the routes that use gcs_state_aggregator into a single head.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code will be structured in this way;
Routes (per each state)
------------------------
Post Processing (e.g., limit)
------------------------
Aggregator (from agent, GCS, and raylet)
so grouping routes by aggregator seems a bit unnatural to me. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong preference here: initially I was thinking it's nice to have all public apis in a single place so people can find them easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah totally on the same page. I prefer to go with this route, but we might need to allow each module to share interface in this case. I will take a look at it in the sooner future, but for now, I will stick to the current style if you don't have strong preference here!
python/ray/experimental/state/api.py
Outdated
return _list("nodes", ListApiOptions(limit=limit, timeout=timeout), address=address) | ||
|
||
|
||
def list_jobs(address: str = None, limit: int = 1000, timeout: int = 30): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now address is API server address. Should we accept the GCS address instead? (that's how other APIs are doing it seems like)
python/ray/experimental/state/api.py
Outdated
|
||
|
||
# TODO(sang): Replace it with auto-generated methods. | ||
def list_actors(address: str = None, limit: int = 1000, timeout: int = 30): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the current style make sense? It will accept options in the API spec as each argument. https://docs.google.com/document/d/1eyvSPnYgXBdEXB2-qm_gKDDfO2eEnHe7R1Mu6y-9FWU/edit#
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:dogscience:
@rkooo567 could you make the doc public? |
def __init__(self, dashboard_head): | ||
super().__init__(dashboard_head) | ||
|
||
@routes.get("/api/v0/placement_groups") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at least combine all the routes that use gcs_state_aggregator into a single head.py?
@@ -230,6 +230,11 @@ async def kill_actor(self, req) -> aiohttp.web.Response: | |||
|
|||
return rest_response(success=True, message=f"Killed actor with id {actor_id}") | |||
|
|||
@routes.get("/api/v0/actors") | |||
async def get_actors(self, req) -> aiohttp.web.Response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we supporting list options like timeout and limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't added the code yet (sorry my bad). limit will be used for post-processing (after aggregation), and timeout will be used to decide the internal RPC timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be handled in a separate PR. If you'd like me to, I can remove the options from this PR.
def list_workers(api_server_url: str = None, limit: int = 1000, timeout: int = 30): | ||
return _list( | ||
"workers", | ||
ListApiOptions(limit=limit, timeout=timeout), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no pagination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't plan to support pagination in the scope of the project (but we will only limit the output). Pagination could be a follow-up project if it is required.
@raulchen I need some modification to the doc before making it public. Note that it is the project written in this RFC (state observability) #22833 The high-level idea is that we will expose all states of Ray (extended version of global_state), and we are planning to use the API server to do this. I will create a RFC by the end of this week unless other priorities come up. |
@jjyao So, these items will be done in a follow up;
|
|
||
|
||
# TODO(sang): Replace it with auto-generated methods. | ||
def _list(resource_name: str, options: ListApiOptions, api_server_url: str = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edoakes should I implement this using SubmissionClient
?
def __init__(self, dashboard_head): | ||
super().__init__(dashboard_head) | ||
|
||
@routes.get("/api/v0/placement_groups") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah totally on the same page. I prefer to go with this route, but we might need to allow each module to share interface in this case. I will take a look at it in the sooner future, but for now, I will stick to the current style if you don't have strong preference here!
All comments are addressed |
return r.json()["data"]["result"] | ||
|
||
|
||
def list_actors(api_server_url: str = None, limit: int = 1000, timeout: int = 30): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any guideline here, but I feel limit and timeout are better to be kw args?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this already a kw args? Or are you saying we should use list_actors(**kwargs)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like adding a * to force arguments after it being kwargs
…ized data (ray-project#23744)" (ray-project#23918)" This reverts commit fb14e82.
Why are these changes needed?
Support listing actor/pg/job/node/workers
Design doc: https://docs.google.com/document/d/1IeEsJOiurg-zctOcBjY-tQVbsCmURFSnUCTkx_4a7Cw/edit#heading=h.9ub9e6yvu9p2
Note that this PR doesn't contain any output except ids. I will update them in the follow-up PRs.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.