[Core][DAG] Deprecate DAGNode.execute()#63716
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a class-level cache (_remote_func_cache) in FunctionNode to prevent re-registering the same function in GCS KV on every execution, along with corresponding unit tests. The reviewer pointed out that using a class-level dictionary to store RemoteFunction objects creates a permanent memory leak because it holds strong references to the function bodies, preventing garbage collection. The reviewer suggested storing the cached RemoteFunction directly as an attribute on the function body itself (with a fallback to the class-level cache for objects that do not support setting attributes) and provided corresponding updates for the unit tests.
| func_id = id(self._body) | ||
| if func_id not in FunctionNode._remote_func_cache: | ||
| FunctionNode._remote_func_cache[func_id] = ray.remote(self._body) | ||
| return ( | ||
| ray.remote(self._body) | ||
| FunctionNode._remote_func_cache[func_id] | ||
| .options(**self._bound_options) | ||
| .remote(*self._bound_args, **self._bound_kwargs) | ||
| ) |
There was a problem hiding this comment.
Using a class-level dictionary (_remote_func_cache) to store RemoteFunction objects creates a permanent memory leak.
Because RemoteFunction holds a strong reference to the underlying function body (self._body), and the class-level cache holds a strong reference to the RemoteFunction, any function executed via a DAG will never be garbage collected. In long-running applications that dynamically define functions or use closures, this will lead to unbounded memory growth.
To fix this, we can attempt to store the cached RemoteFunction directly as an attribute on the function body itself. This ties the lifetime of the cached RemoteFunction to the function body. For objects that do not support setting attributes (like built-ins), we can fall back to the class-level cache.
| func_id = id(self._body) | |
| if func_id not in FunctionNode._remote_func_cache: | |
| FunctionNode._remote_func_cache[func_id] = ray.remote(self._body) | |
| return ( | |
| ray.remote(self._body) | |
| FunctionNode._remote_func_cache[func_id] | |
| .options(**self._bound_options) | |
| .remote(*self._bound_args, **self._bound_kwargs) | |
| ) | |
| func = self._body | |
| try: | |
| if not hasattr(func, "_ray_remote_func"): | |
| func._ray_remote_func = ray.remote(func) | |
| remote_func = func._ray_remote_func | |
| except AttributeError: | |
| func_id = id(func) | |
| if func_id not in FunctionNode._remote_func_cache: | |
| FunctionNode._remote_func_cache[func_id] = ray.remote(func) | |
| remote_func = FunctionNode._remote_func_cache[func_id] | |
| return ( | |
| remote_func | |
| .options(**self._bound_options) | |
| .remote(*self._bound_args, **self._bound_kwargs) | |
| ) |
| assert len(FunctionNode._remote_func_cache) == 0 | ||
|
|
||
| ray.get(node.execute()) | ||
| assert len(FunctionNode._remote_func_cache) == 1 | ||
| assert id(double) in FunctionNode._remote_func_cache | ||
|
|
||
| # Execute again - cache size should NOT increase | ||
| ray.get(node.execute()) | ||
| assert len(FunctionNode._remote_func_cache) == 1 |
There was a problem hiding this comment.
Update the test assertions to verify the attribute-based caching on the function body instead of checking the class-level cache size.
| assert len(FunctionNode._remote_func_cache) == 0 | |
| ray.get(node.execute()) | |
| assert len(FunctionNode._remote_func_cache) == 1 | |
| assert id(double) in FunctionNode._remote_func_cache | |
| # Execute again - cache size should NOT increase | |
| ray.get(node.execute()) | |
| assert len(FunctionNode._remote_func_cache) == 1 | |
| assert not hasattr(double, "_ray_remote_func") | |
| ray.get(node.execute()) | |
| assert hasattr(double, "_ray_remote_func") | |
| # Execute again - cache should still be active | |
| ray.get(node.execute()) | |
| assert hasattr(double, "_ray_remote_func") |
| # 50 executions but only 1 cache entry | ||
| assert len(FunctionNode._remote_func_cache) == 1 |
There was a problem hiding this comment.
41bf907 to
e970ee2
Compare
a2cc37b to
1860443
Compare
|
I don't think this |
Thanks for the feedback! That makes sense — if Would you prefer:
|
|
@Phucvt123 let's add a deprecation warning and leave it around for a couple of releases before we remove |
Signed-off-by: Vũ Trần Phúc <Vuphuccc@gmail.com>
498aa23 to
412656d
Compare
c447594 to
da0fa9e
Compare
Signed-off-by: Vũ Trần Phúc <Vuphuccc@gmail.com>
da0fa9e to
a5e0d96
Compare
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
|
Pushed a change to adjust the warning message. Please also update the PR title and description |
Signed-off-by: Vũ Trần Phúc <Vuphuccc@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit f0bf17a. Configure here.
Signed-off-by: Vũ Trần Phúc <Vuphuccc@gmail.com>
|
@edoakes Thanks! I have updated the PR title and description to reflect the deprecation of I also fixed a minor syntax error in the warning message and switched it to |
## Why are these changes needed? In the non-compiled DAG path, calling `DAGNode.execute()` executes `FunctionNode.execute()`, which dynamically defines a new remote function via `ray.remote(self._body)` on every execution. This exports new metadata to the GCS KV store on every run, leading to an unbounded memory leak (GCS KV leak) during the job lifetime (ray-project#63666). Since the non-compiled DAG execution path is not recommended for production and is a primary source of this leak, we are deprecating `DAGNode.execute()` in favor of the compiled DAG API. This PR adds a `DeprecationWarning` to `DAGNode.execute()` to warn users that it is deprecated and will be removed in a future release. ## Related issue number Closes ray-project#63666 --------- Signed-off-by: Vũ Trần Phúc <Vuphuccc@gmail.com> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>

Why are these changes needed?
In the non-compiled DAG path, calling
DAGNode.execute()executesFunctionNode.execute(), which dynamically defines a new remote function viaray.remote(self._body)on every execution. This exports new metadata to the GCS KV store on every run, leading to an unbounded memory leak (GCS KV leak) during the job lifetime (#63666).Since the non-compiled DAG execution path is not recommended for production and is a primary source of this leak, we are deprecating
DAGNode.execute()in favor of the compiled DAG API.This PR adds a
DeprecationWarningtoDAGNode.execute()to warn users that it is deprecated and will be removed in a future release.Related issue number
Closes #63666