diff --git a/.github/workflows/install-hadoop.sh b/.github/workflows/install-hadoop.sh index e9b1426bd2..2c6d236178 100755 --- a/.github/workflows/install-hadoop.sh +++ b/.github/workflows/install-hadoop.sh @@ -9,7 +9,7 @@ sudo npm uninstall -g yarn || true sudo apt-get install -yq ssh rsync -VERSION=3.3.1 +VERSION=2.10.1 HADOOP_URL="https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=hadoop/common/hadoop-$VERSION/hadoop-$VERSION.tar.gz" # download hadoop diff --git a/mars/deploy/oscar/local.py b/mars/deploy/oscar/local.py index ed121f406c..2275a92f5b 100644 --- a/mars/deploy/oscar/local.py +++ b/mars/deploy/oscar/local.py @@ -57,11 +57,19 @@ async def new_cluster_in_isolation( config: Union[str, Dict] = None, web: bool = True, timeout: float = None, + n_supervisor_process: int = 0, ) -> ClientType: if subprocess_start_method is None: subprocess_start_method = "spawn" if sys.platform == "win32" else "forkserver" cluster = LocalCluster( - address, n_worker, n_cpu, cuda_devices, subprocess_start_method, config, web + address, + n_worker, + n_cpu, + cuda_devices, + subprocess_start_method, + config, + web, + n_supervisor_process, ) await cluster.start() return await LocalClient.create(cluster, backend, timeout) @@ -77,6 +85,7 @@ async def new_cluster( web: bool = True, loop: asyncio.AbstractEventLoop = None, use_uvloop: Union[bool, str] = "auto", + n_supervisor_process: int = 0, ) -> ClientType: coro = new_cluster_in_isolation( address, @@ -86,6 +95,7 @@ async def new_cluster( subprocess_start_method=subprocess_start_method, config=config, web=web, + n_supervisor_process=n_supervisor_process, ) isolation = ensure_isolation_created(dict(loop=loop, use_uvloop=use_uvloop)) fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop) @@ -111,6 +121,7 @@ def __init__( config: Union[str, Dict] = None, web: Union[bool, str] = "auto", timeout: float = None, + n_supervisor_process: int = 0, ): # load third party extensions. init_extension_entrypoints() @@ -121,6 +132,7 @@ def __init__( self._subprocess_start_method = subprocess_start_method self._config = config self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu + self._n_supervisor_process = n_supervisor_process if cuda_devices == "auto": total = cuda_count() all_devices = np.arange(total) @@ -188,7 +200,7 @@ async def _start_supervisor_pool(self): ) self._supervisor_pool = await create_supervisor_actor_pool( self._address, - n_process=0, + n_process=self._n_supervisor_process, modules=supervisor_modules, subprocess_start_method=self._subprocess_start_method, ) diff --git a/mars/deploy/oscar/ray.py b/mars/deploy/oscar/ray.py index 16ffe27971..7b5be355d3 100644 --- a/mars/deploy/oscar/ray.py +++ b/mars/deploy/oscar/ray.py @@ -304,8 +304,17 @@ async def new_cluster( ensure_isolation_created(kwargs) if kwargs: # pragma: no cover raise TypeError(f"new_cluster got unexpected " f"arguments: {list(kwargs)}") + n_supervisor_process = kwargs.get( + "n_supervisor_process", DEFAULT_SUPERVISOR_SUB_POOL_NUM + ) cluster = RayCluster( - cluster_name, supervisor_mem, worker_num, worker_cpu, worker_mem, config + cluster_name, + supervisor_mem, + worker_num, + worker_cpu, + worker_mem, + config, + n_supervisor_process=n_supervisor_process, ) try: await cluster.start() @@ -371,11 +380,13 @@ def __init__( worker_cpu: int = 16, worker_mem: int = 32 * 1024 ** 3, config: Union[str, Dict] = None, + n_supervisor_process: int = DEFAULT_SUPERVISOR_SUB_POOL_NUM, ): # load third party extensions. init_extension_entrypoints() self._cluster_name = cluster_name self._supervisor_mem = supervisor_mem + self._n_supervisor_process = n_supervisor_process self._worker_num = worker_num self._worker_cpu = worker_cpu self._worker_mem = worker_mem @@ -402,7 +413,7 @@ async def start(self): self._config.get("cluster", {}) .get("ray", {}) .get("supervisor", {}) - .get("sub_pool_num", DEFAULT_SUPERVISOR_SUB_POOL_NUM) + .get("sub_pool_num", self._n_supervisor_process) ) from ...storage.ray import support_specify_owner diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index a22d0167b1..3a0a23c60c 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -1715,7 +1715,10 @@ def _attach_session(future: asyncio.Future): break except asyncio.TimeoutError: # timeout - if not cancelled.is_set(): + if ( + not cancelled.is_set() + and execution_info.progress() is not None + ): progress_bar.update(execution_info.progress() * 100) if cancelled.is_set(): # cancel execution diff --git a/mars/deploy/oscar/supervisor.py b/mars/deploy/oscar/supervisor.py index 0536d89987..d6f6c871b9 100644 --- a/mars/deploy/oscar/supervisor.py +++ b/mars/deploy/oscar/supervisor.py @@ -32,6 +32,9 @@ def __init__(self): def config_args(self, parser): super().config_args(parser) parser.add_argument("-w", "--web-port", help="web port of the service") + parser.add_argument( + "--n-process", help="number of supervisor processes", default="0" + ) def parse_args(self, parser, argv, environ=None): args = super().parse_args(parser, argv, environ=environ) @@ -52,7 +55,7 @@ def parse_args(self, parser, argv, environ=None): async def create_actor_pool(self): return await create_supervisor_actor_pool( self.args.endpoint, - n_process=0, + n_process=int(self.args.n_process), ports=self.ports, modules=self.args.load_modules, logging_conf=self.logging_conf, diff --git a/mars/deploy/oscar/tests/test_cmdline.py b/mars/deploy/oscar/tests/test_cmdline.py index c185a83ac4..836a5d3b01 100644 --- a/mars/deploy/oscar/tests/test_cmdline.py +++ b/mars/deploy/oscar/tests/test_cmdline.py @@ -103,7 +103,7 @@ def _get_labelled_port(label=None, create=True): test_name = os.environ["PYTEST_CURRENT_TEST"] if (test_name, label) not in _test_port_cache: if create: - _test_port_cache[(test_name, label)] = get_next_port() + _test_port_cache[(test_name, label)] = get_next_port(occupy=True) else: return None return _test_port_cache[(test_name, label)] @@ -128,6 +128,8 @@ def _get_labelled_port(label=None, create=True): lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}', "-w", lambda: str(_get_labelled_port("web")), + "--n-process", + "2", ], worker_cmd_start + [ @@ -147,19 +149,20 @@ def _reload_args(args): return [arg if not callable(arg) else arg() for arg in args] -_rerun_errors = (_ProcessExitedException,) + ( +_rerun_errors = ( + _ProcessExitedException, asyncio.TimeoutError, futures.TimeoutError, TimeoutError, ) -@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors)) @pytest.mark.parametrize( "supervisor_args,worker_args,use_web_addr", list(start_params.values()), ids=list(start_params.keys()), ) +@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors)) def test_cmdline_run(supervisor_args, worker_args, use_web_addr): new_isolation() sv_proc = w_procs = None diff --git a/mars/oscar/backends/core.py b/mars/oscar/backends/core.py index d54b1a5a65..f6b87d9fc6 100644 --- a/mars/oscar/backends/core.py +++ b/mars/oscar/backends/core.py @@ -66,6 +66,8 @@ async def _listen(self, client: Client): self._client_to_message_futures[client] = dict() for future in message_futures.values(): future.set_exception(e) + finally: + await asyncio.sleep(0) message_futures = self._client_to_message_futures.get(client) self._client_to_message_futures[client] = dict() diff --git a/mars/oscar/backends/mars/tests/test_pool.py b/mars/oscar/backends/mars/tests/test_pool.py index 6d86bd1785..2ff3ba3dc3 100644 --- a/mars/oscar/backends/mars/tests/test_pool.py +++ b/mars/oscar/backends/mars/tests/test_pool.py @@ -116,9 +116,11 @@ def clear_routers(): @pytest.mark.asyncio +@mock.patch("mars.oscar.backends.mars.pool.SubActorPool.notify_main_pool_to_create") @mock.patch("mars.oscar.backends.mars.pool.SubActorPool.notify_main_pool_to_destroy") -async def test_sub_actor_pool(notify_main_pool): - notify_main_pool.return_value = None +async def test_sub_actor_pool(notify_main_pool_to_create, notify_main_pool_to_destroy): + notify_main_pool_to_create.return_value = None + notify_main_pool_to_destroy.return_value = None config = ActorPoolConfig() ext_address0 = f"127.0.0.1:{get_next_port()}" @@ -850,3 +852,34 @@ def get_logger_level(self): _Actor, allocate_strategy=strategy, address=pool.external_address ) assert await ref.get_logger_level() == logging.DEBUG + + +@pytest.mark.asyncio +async def test_ref_sub_pool_actor(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + pool = await create_actor_pool( + "127.0.0.1", + pool_cls=MainActorPool, + n_process=1, + subprocess_start_method=start_method, + ) + + async with pool: + ctx = get_context() + ref1 = await ctx.create_actor( + TestActor, address=pool.external_address, allocate_strategy=RandomSubPool() + ) + sub_address = ref1.address + ref2 = await ctx.create_actor(TestActor, address=sub_address) + ref2_main = await ctx.actor_ref(ref2.uid, address=pool.external_address) + assert ref2_main.address == sub_address + + await ctx.destroy_actor(create_actor_ref(pool.external_address, ref2.uid)) + assert not await ctx.has_actor( + create_actor_ref(pool.external_address, ref2.uid) + ) + assert not await ctx.has_actor(create_actor_ref(sub_address, ref2.uid)) diff --git a/mars/oscar/backends/message.py b/mars/oscar/backends/message.py index 00877be2fa..27fa5f43fd 100644 --- a/mars/oscar/backends/message.py +++ b/mars/oscar/backends/message.py @@ -59,6 +59,7 @@ class ControlMessageType(Enum): sync_config = 2 get_config = 3 wait_pool_recovered = 4 + add_sub_pool_actor = 5 @dataslots @@ -192,7 +193,14 @@ def message_type(self) -> MessageType: class CreateActorMessage(_MessageBase): - __slots__ = "actor_cls", "actor_id", "args", "kwargs", "allocate_strategy" + __slots__ = ( + "actor_cls", + "actor_id", + "args", + "kwargs", + "allocate_strategy", + "from_main", + ) def __init__( self, @@ -202,6 +210,7 @@ def __init__( args: Tuple, kwargs: Dict, allocate_strategy, + from_main: bool = False, protocol: int = None, message_trace: List[MessageTraceItem] = None, ): @@ -211,6 +220,7 @@ def __init__( self.args = args self.kwargs = kwargs self.allocate_strategy = allocate_strategy + self.from_main = from_main @classproperty @implements(_MessageBase.message_type) diff --git a/mars/oscar/backends/pool.py b/mars/oscar/backends/pool.py index 846d77f0f7..bbc274090a 100644 --- a/mars/oscar/backends/pool.py +++ b/mars/oscar/backends/pool.py @@ -16,9 +16,9 @@ import concurrent.futures as futures import itertools import logging +import multiprocessing import os import threading -import multiprocessing from abc import ABC, ABCMeta, abstractmethod from typing import Dict, List, Type, TypeVar, Coroutine, Callable, Union, Optional @@ -473,7 +473,7 @@ async def create_actor(self, message: CreateActorMessage) -> result_message_type async def has_actor(self, message: HasActorMessage) -> ResultMessage: result = ResultMessage( message.message_id, - to_binary(message.actor_ref.uid) in self._actors, + message.actor_ref.uid in self._actors, protocol=message.protocol, ) return result @@ -497,7 +497,7 @@ async def destroy_actor(self, message: DestroyActorMessage) -> result_message_ty @implements(AbstractActorPool.actor_ref) async def actor_ref(self, message: ActorRefMessage) -> result_message_type: with _ErrorProcessor(message.message_id, message.protocol) as processor: - actor_id = to_binary(message.actor_ref.uid) + actor_id = message.actor_ref.uid if actor_id not in self._actors: raise ActorNotExist(f"Actor {actor_id} does not exist") result = ResultMessage( @@ -649,6 +649,22 @@ async def notify_main_pool_to_destroy( ): # pragma: no cover await self.call(self._main_address, message) + async def notify_main_pool_to_create(self, message: CreateActorMessage): + reg_message = ControlMessage( + new_message_id(), + self.external_address, + ControlMessageType.add_sub_pool_actor, + (self.external_address, message.allocate_strategy, message), + ) + await self.call(self._main_address, reg_message) + + @implements(AbstractActorPool.create_actor) + async def create_actor(self, message: CreateActorMessage) -> result_message_type: + result = await super().create_actor(message) + if not message.from_main: + await self.notify_main_pool_to_create(message) + return result + @implements(AbstractActorPool.actor_ref) async def actor_ref(self, message: ActorRefMessage) -> result_message_type: result = await super().actor_ref(message) @@ -775,6 +791,7 @@ async def create_actor(self, message: CreateActorMessage) -> result_message_type message.args, message.kwargs, allocate_strategy=new_allocate_strategy, + from_main=True, protocol=message.protocol, message_trace=message.message_trace, ) @@ -952,6 +969,17 @@ async def handle_control_command( processor.result = ResultMessage( message.message_id, True, protocol=message.protocol ) + elif message.control_message_type == ControlMessageType.add_sub_pool_actor: + address, allocate_strategy, create_message = message.content + create_message.from_main = True + ref = create_actor_ref(address, to_binary(create_message.actor_id)) + self._allocated_actors[address][ref] = ( + allocate_strategy, + create_message, + ) + processor.result = ResultMessage( + message.message_id, True, protocol=message.protocol + ) else: processor.result = await self.call(message.address, message) return processor.result @@ -1114,9 +1142,8 @@ def process_sub_pool_lost(self, address: str): async def monitor_sub_pools(self): try: while not self._stopped.is_set(): - for address in self.sub_processes: + for address, process in self.sub_processes.items(): try: - process = self.sub_processes[address] recover_events_discovered = address in self._recover_events if not await self.is_sub_pool_alive( process diff --git a/mars/oscar/core.pyx b/mars/oscar/core.pyx index 4725275044..91dfb4d151 100644 --- a/mars/oscar/core.pyx +++ b/mars/oscar/core.pyx @@ -48,6 +48,8 @@ cdef class ActorRef: Reference of an Actor at user side """ def __init__(self, str address, object uid): + if isinstance(uid, str): + uid = uid.encode() self.uid = uid self.address = address self._methods = dict() diff --git a/mars/services/cluster/api/oscar.py b/mars/services/cluster/api/oscar.py index 99b6b59b10..a943280b95 100644 --- a/mars/services/cluster/api/oscar.py +++ b/mars/services/cluster/api/oscar.py @@ -82,8 +82,6 @@ async def get_supervisors_by_keys(self, keys: List[str]) -> List[str]: ---------- keys key for a supervisor address - watch - if True, will watch changes of supervisor changes Returns ------- diff --git a/mars/services/lifecycle/api/web.py b/mars/services/lifecycle/api/web.py index b601361001..d159ecb3d4 100644 --- a/mars/services/lifecycle/api/web.py +++ b/mars/services/lifecycle/api/web.py @@ -14,7 +14,6 @@ from typing import Dict, List -from ....lib.aio import alru_cache from ....utils import serialize_serializable, deserialize_serializable from ...web import web_api, MarsServiceWebAPIHandler, MarsWebAPIClientMixin from .core import AbstractLifecycleAPI @@ -23,19 +22,10 @@ class LifecycleWebAPIHandler(MarsServiceWebAPIHandler): _root_pattern = "/api/session/(?P[^/]+)/lifecycle" - @alru_cache(cache_exceptions=False) - async def _get_cluster_api(self): - from ...cluster import ClusterAPI - - return await ClusterAPI.create(self._supervisor_addr) - - @alru_cache(cache_exceptions=False) async def _get_oscar_lifecycle_api(self, session_id: str): from .oscar import LifecycleAPI - cluster_api = await self._get_cluster_api() - [address] = await cluster_api.get_supervisors_by_keys([session_id]) - return await LifecycleAPI.create(session_id, address) + return await self._get_api_by_key(LifecycleAPI, session_id) @web_api("", method="post", arg_filter={"action": "decref_tileables"}) async def decref_tileables(self, session_id: str): diff --git a/mars/services/meta/api/web.py b/mars/services/meta/api/web.py index 65e40f9f98..1ff60b3158 100644 --- a/mars/services/meta/api/web.py +++ b/mars/services/meta/api/web.py @@ -15,7 +15,6 @@ from typing import Dict, List, Optional from .... import oscar as mo -from ....lib.aio import alru_cache from ....utils import serialize_serializable, deserialize_serializable from ...web import web_api, MarsServiceWebAPIHandler, MarsWebAPIClientMixin from .core import AbstractMetaAPI @@ -24,19 +23,10 @@ class MetaWebAPIHandler(MarsServiceWebAPIHandler): _root_pattern = "/api/session/(?P[^/]+)/meta" - @alru_cache(cache_exceptions=False) - async def _get_cluster_api(self): - from ...cluster import ClusterAPI - - return await ClusterAPI.create(self._supervisor_addr) - - @alru_cache(cache_exceptions=False) async def _get_oscar_meta_api(self, session_id: str): from .oscar import MetaAPI - cluster_api = await self._get_cluster_api() - [address] = await cluster_api.get_supervisors_by_keys([session_id]) - return await MetaAPI.create(session_id, address) + return await self._get_api_by_key(MetaAPI, session_id) @web_api("(?P[^/]+)", method="get") async def get_chunk_meta(self, session_id: str, data_key: str): diff --git a/mars/services/session/api/web.py b/mars/services/session/api/web.py index 1198654794..6f6dba1bdf 100644 --- a/mars/services/session/api/web.py +++ b/mars/services/session/api/web.py @@ -15,7 +15,6 @@ import json from typing import Dict, List, Union -from ....lib.aio import alru_cache from ....utils import parse_readable_size from ...web import web_api, MarsServiceWebAPIHandler, MarsWebAPIClientMixin from ..core import SessionInfo @@ -46,19 +45,10 @@ def _decode_size(encoded: str) -> Union[int, str, Dict[str, Union[int, List[int] class SessionWebAPIBaseHandler(MarsServiceWebAPIHandler): - @alru_cache(cache_exceptions=False) - async def _get_cluster_api(self): - from ...cluster import ClusterAPI - - return await ClusterAPI.create(self._supervisor_addr) - - @alru_cache(cache_exceptions=False) async def _get_oscar_session_api(self): from .oscar import SessionAPI - cluster_api = await self._get_cluster_api() - [address] = await cluster_api.get_supervisors_by_keys(["Session"]) - return await SessionAPI.create(address) + return await self._get_api_by_key(SessionAPI, "Session", with_key_arg=False) class SessionWebAPIHandler(SessionWebAPIBaseHandler): diff --git a/mars/services/session/supervisor/core.py b/mars/services/session/supervisor/core.py index c5e57af16a..55c6d111d9 100644 --- a/mars/services/session/supervisor/core.py +++ b/mars/services/session/supervisor/core.py @@ -42,14 +42,26 @@ async def create_session(self, session_id: str, create_services: bool = True): raise mo.Return(self._session_refs[session_id]) [address] = await self._cluster_api.get_supervisors_by_keys([session_id]) - session_actor_ref = await mo.create_actor( - SessionActor, - session_id, - self._service_config, - address=address, - uid=SessionActor.gen_uid(session_id), - allocate_strategy=mo.allocate_strategy.Random(), - ) + try: + session_actor_ref = await mo.create_actor( + SessionActor, + session_id, + self._service_config, + address=address, + uid=SessionActor.gen_uid(session_id), + allocate_strategy=mo.allocate_strategy.RandomSubPool(), + ) + except IndexError: + # when there is only one supervisor process, strategy RandomSubPool + # fails with IndexError. So we need to retry using strategy Random. + session_actor_ref = await mo.create_actor( + SessionActor, + session_id, + self._service_config, + address=address, + uid=SessionActor.gen_uid(session_id), + allocate_strategy=mo.allocate_strategy.Random(), + ) self._session_refs[session_id] = session_actor_ref # sync ref to other managers diff --git a/mars/services/storage/api/web.py b/mars/services/storage/api/web.py index 2ab73fc67f..f59d50f55b 100644 --- a/mars/services/storage/api/web.py +++ b/mars/services/storage/api/web.py @@ -16,7 +16,6 @@ from typing import Any, List from .... import oscar as mo -from ....lib.aio import alru_cache from ....storage import StorageLevel from ....utils import serialize_serializable, deserialize_serializable from ...web import web_api, MarsServiceWebAPIHandler, MarsWebAPIClientMixin @@ -27,19 +26,10 @@ class StorageWebAPIHandler(MarsServiceWebAPIHandler): _root_pattern = "/api/session/(?P[^/]+)/storage" - @alru_cache(cache_exceptions=False) - async def _get_cluster_api(self): - from ...cluster import ClusterAPI - - return await ClusterAPI.create(self._supervisor_addr) - - @alru_cache(cache_exceptions=False) async def _get_oscar_meta_api(self, session_id: str): from ...meta import MetaAPI - cluster_api = await self._get_cluster_api() - [address] = await cluster_api.get_supervisors_by_keys([session_id]) - return await MetaAPI.create(session_id, address) + return await self._get_api_by_key(MetaAPI, session_id) async def _get_storage_api_by_object_id(self, session_id: str, object_id: str): from .oscar import StorageAPI diff --git a/mars/services/subtask/worker/runner.py b/mars/services/subtask/worker/runner.py index 1f98dc1b7c..24b6a31d25 100644 --- a/mars/services/subtask/worker/runner.py +++ b/mars/services/subtask/worker/runner.py @@ -94,16 +94,24 @@ async def run_subtask(self, subtask: Subtask): session_id = subtask.session_id supervisor_address = await self._get_supervisor_address(session_id) if session_id not in self._session_id_to_processors: - self._session_id_to_processors[session_id] = await mo.create_actor( - SubtaskProcessorActor, - session_id, - self._band, - supervisor_address, - self._worker_address, - self._subtask_processor_cls, - uid=SubtaskProcessorActor.gen_uid(session_id), - address=self.address, - ) + try: + self._session_id_to_processors[session_id] = await mo.create_actor( + SubtaskProcessorActor, + session_id, + self._band, + supervisor_address, + self._worker_address, + self._subtask_processor_cls, + uid=SubtaskProcessorActor.gen_uid(session_id), + address=self.address, + ) + except mo.ActorAlreadyExist: + # when recovering actor pools, the actor created in sub pools + # may be recovered already + self._session_id_to_processors[session_id] = await mo.actor_ref( + uid=SubtaskProcessorActor.gen_uid(session_id), + address=self.address, + ) processor = self._session_id_to_processors[session_id] self._running_processor = self._last_processor = processor try: diff --git a/mars/services/task/api/web.py b/mars/services/task/api/web.py index 585e7dbdae..4162138567 100644 --- a/mars/services/task/api/web.py +++ b/mars/services/task/api/web.py @@ -17,7 +17,6 @@ from typing import List, Optional, Union from ....core import TileableGraph, Tileable -from ....lib.aio import alru_cache from ....utils import serialize_serializable, deserialize_serializable from ...web import web_api, MarsServiceWebAPIHandler, MarsWebAPIClientMixin from ..core import TaskResult, TaskStatus @@ -59,19 +58,10 @@ def _json_deserial_task_result(d: dict) -> Optional[TaskResult]: class TaskWebAPIHandler(MarsServiceWebAPIHandler): _root_pattern = "/api/session/(?P[^/]+)/task" - @alru_cache(cache_exceptions=False) - async def _get_cluster_api(self): - from ...cluster import ClusterAPI - - return await ClusterAPI.create(self._supervisor_addr) - - @alru_cache(cache_exceptions=False) async def _get_oscar_task_api(self, session_id: str): from .oscar import TaskAPI - cluster_api = await self._get_cluster_api() - [address] = await cluster_api.get_supervisors_by_keys([session_id]) - return await TaskAPI.create(session_id, address) + return await self._get_api_by_key(TaskAPI, session_id) @web_api("", method="post") async def submit_tileable_graph(self, session_id: str): diff --git a/mars/services/web/core.py b/mars/services/web/core.py index 085a19240c..ae11646134 100644 --- a/mars/services/web/core.py +++ b/mars/services/web/core.py @@ -20,11 +20,12 @@ import sys import urllib.parse from collections import defaultdict -from typing import Callable, Dict, List, NamedTuple, Optional, Union +from typing import Callable, Dict, List, NamedTuple, Optional, Type, Union from tornado import httpclient, web from tornado.simple_httpclient import HTTPTimeoutError +from ...lib.aio import alru_cache from ...utils import serialize_serializable, deserialize_serializable if sys.version_info[:2] == (3, 6): @@ -81,6 +82,25 @@ async def wrapped(self, *args, **kwargs): return wrapper +@alru_cache(cache_exceptions=False) +async def _get_cluster_api(address: str): + from ..cluster import ClusterAPI + + return await ClusterAPI.create(address) + + +@alru_cache(cache_exceptions=False) +async def _get_api_by_key( + api_cls: Type, session_id: str, address: str, with_key_arg: bool = True +): + cluster_api = await _get_cluster_api(address) + [address] = await cluster_api.get_supervisors_by_keys([session_id]) + if with_key_arg: + return await api_cls.create(session_id, address) + else: + return await api_cls.create(address) + + class MarsServiceWebAPIHandler(MarsRequestHandler): _root_pattern = None _method_to_handlers = None @@ -89,6 +109,16 @@ def __init__(self, *args, **kwargs): self._collect_services() super().__init__(*args, **kwargs) + def _get_api_by_key( + self, api_cls: Type, session_id: str, with_key_arg: bool = True + ): + return _get_api_by_key( + api_cls, + session_id, + address=self._supervisor_addr, + with_key_arg=with_key_arg, + ) + @classmethod def _collect_services(cls): if cls._method_to_handlers is not None: diff --git a/mars/services/web/ui/package-lock.json b/mars/services/web/ui/package-lock.json index af0d13b4a1..68ed708895 100644 --- a/mars/services/web/ui/package-lock.json +++ b/mars/services/web/ui/package-lock.json @@ -2261,9 +2261,9 @@ } }, "node_modules/ansi-regex": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.0.tgz", - "integrity": "sha512-bY6fj56OUQ0hU1KjFNDQuJFezqKdrAyFdIevADiqrWHwSlbmBNMHp5ak2f40Pm8JTFyM2mqxkG6ngkHO11f/lg==", + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.1.tgz", + "integrity": "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==", "dev": true, "engines": { "node": ">=8" @@ -9200,9 +9200,9 @@ "dev": true }, "ansi-regex": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.0.tgz", - "integrity": "sha512-bY6fj56OUQ0hU1KjFNDQuJFezqKdrAyFdIevADiqrWHwSlbmBNMHp5ak2f40Pm8JTFyM2mqxkG6ngkHO11f/lg==", + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.1.tgz", + "integrity": "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==", "dev": true }, "ansi-styles": {