Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/install-hadoop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sudo npm uninstall -g yarn || true

sudo apt-get install -yq ssh rsync

VERSION=3.3.1
VERSION=2.10.1
HADOOP_URL="https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=hadoop/common/hadoop-$VERSION/hadoop-$VERSION.tar.gz"

# download hadoop
Expand Down
16 changes: 14 additions & 2 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ async def new_cluster_in_isolation(
config: Union[str, Dict] = None,
web: bool = True,
timeout: float = None,
n_supervisor_process: int = 0,
) -> ClientType:
if subprocess_start_method is None:
subprocess_start_method = "spawn" if sys.platform == "win32" else "forkserver"
cluster = LocalCluster(
address, n_worker, n_cpu, cuda_devices, subprocess_start_method, config, web
address,
n_worker,
n_cpu,
cuda_devices,
subprocess_start_method,
config,
web,
n_supervisor_process,
)
await cluster.start()
return await LocalClient.create(cluster, backend, timeout)
Expand All @@ -77,6 +85,7 @@ async def new_cluster(
web: bool = True,
loop: asyncio.AbstractEventLoop = None,
use_uvloop: Union[bool, str] = "auto",
n_supervisor_process: int = 0,
) -> ClientType:
coro = new_cluster_in_isolation(
address,
Expand All @@ -86,6 +95,7 @@ async def new_cluster(
subprocess_start_method=subprocess_start_method,
config=config,
web=web,
n_supervisor_process=n_supervisor_process,
)
isolation = ensure_isolation_created(dict(loop=loop, use_uvloop=use_uvloop))
fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop)
Expand All @@ -111,6 +121,7 @@ def __init__(
config: Union[str, Dict] = None,
web: Union[bool, str] = "auto",
timeout: float = None,
n_supervisor_process: int = 0,
):
# load third party extensions.
init_extension_entrypoints()
Expand All @@ -121,6 +132,7 @@ def __init__(
self._subprocess_start_method = subprocess_start_method
self._config = config
self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu
self._n_supervisor_process = n_supervisor_process
if cuda_devices == "auto":
total = cuda_count()
all_devices = np.arange(total)
Expand Down Expand Up @@ -188,7 +200,7 @@ async def _start_supervisor_pool(self):
)
self._supervisor_pool = await create_supervisor_actor_pool(
self._address,
n_process=0,
n_process=self._n_supervisor_process,
modules=supervisor_modules,
subprocess_start_method=self._subprocess_start_method,
)
Expand Down
15 changes: 13 additions & 2 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,17 @@ async def new_cluster(
ensure_isolation_created(kwargs)
if kwargs: # pragma: no cover
raise TypeError(f"new_cluster got unexpected " f"arguments: {list(kwargs)}")
n_supervisor_process = kwargs.get(
"n_supervisor_process", DEFAULT_SUPERVISOR_SUB_POOL_NUM
)
cluster = RayCluster(
cluster_name, supervisor_mem, worker_num, worker_cpu, worker_mem, config
cluster_name,
supervisor_mem,
worker_num,
worker_cpu,
worker_mem,
config,
n_supervisor_process=n_supervisor_process,
)
try:
await cluster.start()
Expand Down Expand Up @@ -371,11 +380,13 @@ def __init__(
worker_cpu: int = 16,
worker_mem: int = 32 * 1024 ** 3,
config: Union[str, Dict] = None,
n_supervisor_process: int = DEFAULT_SUPERVISOR_SUB_POOL_NUM,
):
# load third party extensions.
init_extension_entrypoints()
self._cluster_name = cluster_name
self._supervisor_mem = supervisor_mem
self._n_supervisor_process = n_supervisor_process
self._worker_num = worker_num
self._worker_cpu = worker_cpu
self._worker_mem = worker_mem
Expand All @@ -402,7 +413,7 @@ async def start(self):
self._config.get("cluster", {})
.get("ray", {})
.get("supervisor", {})
.get("sub_pool_num", DEFAULT_SUPERVISOR_SUB_POOL_NUM)
.get("sub_pool_num", self._n_supervisor_process)
)
from ...storage.ray import support_specify_owner

Expand Down
5 changes: 4 additions & 1 deletion mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,10 @@ def _attach_session(future: asyncio.Future):
break
except asyncio.TimeoutError:
# timeout
if not cancelled.is_set():
if (
not cancelled.is_set()
and execution_info.progress() is not None
):
progress_bar.update(execution_info.progress() * 100)
if cancelled.is_set():
# cancel execution
Expand Down
5 changes: 4 additions & 1 deletion mars/deploy/oscar/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def __init__(self):
def config_args(self, parser):
super().config_args(parser)
parser.add_argument("-w", "--web-port", help="web port of the service")
parser.add_argument(
"--n-process", help="number of supervisor processes", default="0"
)

def parse_args(self, parser, argv, environ=None):
args = super().parse_args(parser, argv, environ=environ)
Expand All @@ -52,7 +55,7 @@ def parse_args(self, parser, argv, environ=None):
async def create_actor_pool(self):
return await create_supervisor_actor_pool(
self.args.endpoint,
n_process=0,
n_process=int(self.args.n_process),
ports=self.ports,
modules=self.args.load_modules,
logging_conf=self.logging_conf,
Expand Down
9 changes: 6 additions & 3 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _get_labelled_port(label=None, create=True):
test_name = os.environ["PYTEST_CURRENT_TEST"]
if (test_name, label) not in _test_port_cache:
if create:
_test_port_cache[(test_name, label)] = get_next_port()
_test_port_cache[(test_name, label)] = get_next_port(occupy=True)
else:
return None
return _test_port_cache[(test_name, label)]
Expand All @@ -128,6 +128,8 @@ def _get_labelled_port(label=None, create=True):
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"-w",
lambda: str(_get_labelled_port("web")),
"--n-process",
"2",
],
worker_cmd_start
+ [
Expand All @@ -147,19 +149,20 @@ def _reload_args(args):
return [arg if not callable(arg) else arg() for arg in args]


_rerun_errors = (_ProcessExitedException,) + (
_rerun_errors = (
_ProcessExitedException,
asyncio.TimeoutError,
futures.TimeoutError,
TimeoutError,
)


@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
@pytest.mark.parametrize(
"supervisor_args,worker_args,use_web_addr",
list(start_params.values()),
ids=list(start_params.keys()),
)
@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
new_isolation()
sv_proc = w_procs = None
Expand Down
2 changes: 2 additions & 0 deletions mars/oscar/backends/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ async def _listen(self, client: Client):
self._client_to_message_futures[client] = dict()
for future in message_futures.values():
future.set_exception(e)
finally:
await asyncio.sleep(0)

message_futures = self._client_to_message_futures.get(client)
self._client_to_message_futures[client] = dict()
Expand Down
37 changes: 35 additions & 2 deletions mars/oscar/backends/mars/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ def clear_routers():


@pytest.mark.asyncio
@mock.patch("mars.oscar.backends.mars.pool.SubActorPool.notify_main_pool_to_create")
@mock.patch("mars.oscar.backends.mars.pool.SubActorPool.notify_main_pool_to_destroy")
async def test_sub_actor_pool(notify_main_pool):
notify_main_pool.return_value = None
async def test_sub_actor_pool(notify_main_pool_to_create, notify_main_pool_to_destroy):
notify_main_pool_to_create.return_value = None
notify_main_pool_to_destroy.return_value = None
config = ActorPoolConfig()

ext_address0 = f"127.0.0.1:{get_next_port()}"
Expand Down Expand Up @@ -850,3 +852,34 @@ def get_logger_level(self):
_Actor, allocate_strategy=strategy, address=pool.external_address
)
assert await ref.get_logger_level() == logging.DEBUG


@pytest.mark.asyncio
async def test_ref_sub_pool_actor():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
pool = await create_actor_pool(
"127.0.0.1",
pool_cls=MainActorPool,
n_process=1,
subprocess_start_method=start_method,
)

async with pool:
ctx = get_context()
ref1 = await ctx.create_actor(
TestActor, address=pool.external_address, allocate_strategy=RandomSubPool()
)
sub_address = ref1.address
ref2 = await ctx.create_actor(TestActor, address=sub_address)
ref2_main = await ctx.actor_ref(ref2.uid, address=pool.external_address)
assert ref2_main.address == sub_address

await ctx.destroy_actor(create_actor_ref(pool.external_address, ref2.uid))
assert not await ctx.has_actor(
create_actor_ref(pool.external_address, ref2.uid)
)
assert not await ctx.has_actor(create_actor_ref(sub_address, ref2.uid))
12 changes: 11 additions & 1 deletion mars/oscar/backends/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ControlMessageType(Enum):
sync_config = 2
get_config = 3
wait_pool_recovered = 4
add_sub_pool_actor = 5


@dataslots
Expand Down Expand Up @@ -192,7 +193,14 @@ def message_type(self) -> MessageType:


class CreateActorMessage(_MessageBase):
__slots__ = "actor_cls", "actor_id", "args", "kwargs", "allocate_strategy"
__slots__ = (
"actor_cls",
"actor_id",
"args",
"kwargs",
"allocate_strategy",
"from_main",
)

def __init__(
self,
Expand All @@ -202,6 +210,7 @@ def __init__(
args: Tuple,
kwargs: Dict,
allocate_strategy,
from_main: bool = False,
protocol: int = None,
message_trace: List[MessageTraceItem] = None,
):
Expand All @@ -211,6 +220,7 @@ def __init__(
self.args = args
self.kwargs = kwargs
self.allocate_strategy = allocate_strategy
self.from_main = from_main

@classproperty
@implements(_MessageBase.message_type)
Expand Down
37 changes: 32 additions & 5 deletions mars/oscar/backends/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import concurrent.futures as futures
import itertools
import logging
import multiprocessing
import os
import threading
import multiprocessing
from abc import ABC, ABCMeta, abstractmethod
from typing import Dict, List, Type, TypeVar, Coroutine, Callable, Union, Optional

Expand Down Expand Up @@ -473,7 +473,7 @@ async def create_actor(self, message: CreateActorMessage) -> result_message_type
async def has_actor(self, message: HasActorMessage) -> ResultMessage:
result = ResultMessage(
message.message_id,
to_binary(message.actor_ref.uid) in self._actors,
message.actor_ref.uid in self._actors,
protocol=message.protocol,
)
return result
Expand All @@ -497,7 +497,7 @@ async def destroy_actor(self, message: DestroyActorMessage) -> result_message_ty
@implements(AbstractActorPool.actor_ref)
async def actor_ref(self, message: ActorRefMessage) -> result_message_type:
with _ErrorProcessor(message.message_id, message.protocol) as processor:
actor_id = to_binary(message.actor_ref.uid)
actor_id = message.actor_ref.uid
if actor_id not in self._actors:
raise ActorNotExist(f"Actor {actor_id} does not exist")
result = ResultMessage(
Expand Down Expand Up @@ -649,6 +649,22 @@ async def notify_main_pool_to_destroy(
): # pragma: no cover
await self.call(self._main_address, message)

async def notify_main_pool_to_create(self, message: CreateActorMessage):
reg_message = ControlMessage(
new_message_id(),
self.external_address,
ControlMessageType.add_sub_pool_actor,
(self.external_address, message.allocate_strategy, message),
)
await self.call(self._main_address, reg_message)

@implements(AbstractActorPool.create_actor)
async def create_actor(self, message: CreateActorMessage) -> result_message_type:
result = await super().create_actor(message)
if not message.from_main:
await self.notify_main_pool_to_create(message)
return result

@implements(AbstractActorPool.actor_ref)
async def actor_ref(self, message: ActorRefMessage) -> result_message_type:
result = await super().actor_ref(message)
Expand Down Expand Up @@ -775,6 +791,7 @@ async def create_actor(self, message: CreateActorMessage) -> result_message_type
message.args,
message.kwargs,
allocate_strategy=new_allocate_strategy,
from_main=True,
protocol=message.protocol,
message_trace=message.message_trace,
)
Expand Down Expand Up @@ -952,6 +969,17 @@ async def handle_control_command(
processor.result = ResultMessage(
message.message_id, True, protocol=message.protocol
)
elif message.control_message_type == ControlMessageType.add_sub_pool_actor:
address, allocate_strategy, create_message = message.content
create_message.from_main = True
ref = create_actor_ref(address, to_binary(create_message.actor_id))
self._allocated_actors[address][ref] = (
allocate_strategy,
create_message,
)
processor.result = ResultMessage(
message.message_id, True, protocol=message.protocol
)
else:
processor.result = await self.call(message.address, message)
return processor.result
Expand Down Expand Up @@ -1114,9 +1142,8 @@ def process_sub_pool_lost(self, address: str):
async def monitor_sub_pools(self):
try:
while not self._stopped.is_set():
for address in self.sub_processes:
for address, process in self.sub_processes.items():
try:
process = self.sub_processes[address]
recover_events_discovered = address in self._recover_events
if not await self.is_sub_pool_alive(
process
Expand Down
2 changes: 2 additions & 0 deletions mars/oscar/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ cdef class ActorRef:
Reference of an Actor at user side
"""
def __init__(self, str address, object uid):
if isinstance(uid, str):
uid = uid.encode()
self.uid = uid
self.address = address
self._methods = dict()
Expand Down
2 changes: 0 additions & 2 deletions mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ async def get_supervisors_by_keys(self, keys: List[str]) -> List[str]:
----------
keys
key for a supervisor address
watch
if True, will watch changes of supervisor changes

Returns
-------
Expand Down
Loading