Skip to content

Commit

Permalink
[Ray] Fix ray memory leak (#3184)
Browse files Browse the repository at this point in the history
* Fix memory leak

* Add too many clients warning for actor caller

* Refine fix

* Trigger CI

* Fix

* Free result message object as soon as possible in _listen

* Fix lint

* Fix lint

Co-authored-by: 刘宝 <po.lb@antfin.com>
Co-authored-by: 刘宝 <po.lb@antgroup.com>
  • Loading branch information
3 people committed Aug 3, 2022
1 parent a9148e7 commit 7417139
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ jobs:
flake8 mars --config=flake8_init.ini
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 mars --config="default" --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
flake8 mars --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
displayName: 'Lint with flake8'
- bash: |
Expand Down
15 changes: 12 additions & 3 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,15 @@ def __init__(
self.web_address = None

async def start(self):
logging.basicConfig(
format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO, force=True
)
try:
# Python 3.8 support force argument.
logging.basicConfig(
format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO, force=True
)
except ValueError: # pragma: no cover
logging.basicConfig(
format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO
)
logger.info("Start cluster with config %s", self._config)
# init metrics to guarantee metrics use in driver
metric_configs = self._config.get("metrics", {})
Expand Down Expand Up @@ -499,6 +505,9 @@ async def start(self):
self._config, NodeRole.SUPERVISOR
)

# set global router an empty one.
Router.set_instance(Router(list(), None))

# create supervisor actor pool
supervisor_pool_coro = asyncio.create_task(
create_supervisor_actor_pool(
Expand Down
20 changes: 20 additions & 0 deletions mars/oscar/backends/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import copy
import logging
from typing import Dict, Union

from ...oscar.profiling import ProfilingData
Expand All @@ -25,6 +26,7 @@


ResultMessageType = Union[ResultMessage, ErrorMessage]
logger = logging.getLogger(__name__)


class ActorCaller:
Expand All @@ -41,6 +43,14 @@ async def get_client(self, router: Router, dest_address: str) -> Client:
if client not in self._clients:
self._clients[client] = asyncio.create_task(self._listen(client))
self._client_to_message_futures[client] = dict()
client_count = len(self._clients)
if client_count >= 100: # pragma: no cover
if (client_count - 100) % 10 == 0: # pragma: no cover
logger.warning(
"Actor caller has created too many clients (%s >= 100), "
"the global router may not be set.",
client_count,
)
return client

async def _listen(self, client: Client):
Expand Down Expand Up @@ -70,6 +80,16 @@ async def _listen(self, client: Client):
for future in message_futures.values():
future.set_exception(copy.copy(e))
finally:
# message may have Ray ObjectRef, delete it early in case next loop doesn't run
# as soon as expected.
try:
del message
except NameError:
pass
try:
del future
except NameError:
pass
await asyncio.sleep(0)

message_futures = self._client_to_message_futures.get(client)
Expand Down

0 comments on commit 7417139

Please sign in to comment.