Skip to content

Commit

Permalink
[Ray] fix auto scale-in hang (#3043)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Jun 8, 2022
1 parent 6ffc7b9 commit 18a3af8
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 24 deletions.
66 changes: 54 additions & 12 deletions mars/deploy/oscar/tests/test_ray_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os

import numpy as np
import pytest
Expand All @@ -35,6 +37,8 @@

ray = lazy_import("ray")

logger = logging.getLogger(__name__)


@pytest.fixture
async def speculative_cluster():
Expand Down Expand Up @@ -224,14 +228,13 @@ async def test_auto_scale_in(ray_large_cluster):
)
while await autoscaler_ref.get_dynamic_worker_nums() > 2:
dynamic_workers = await autoscaler_ref.get_dynamic_workers()
print(f"Waiting workers {dynamic_workers} to be released.")
logger.info(f"Waiting %s workers to be released.", dynamic_workers)
await asyncio.sleep(1)
await asyncio.sleep(1)
assert await autoscaler_ref.get_dynamic_worker_nums() == 2


@pytest.mark.skip("Enable it when ray ownership bug is fixed")
@pytest.mark.timeout(timeout=200)
@pytest.mark.timeout(timeout=150)
@pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 4}], indirect=True)
@require_ray
@pytest.mark.asyncio
Expand All @@ -255,23 +258,62 @@ async def test_ownership_when_scale_in(ray_large_cluster):
uid=AutoscalerActor.default_uid(),
address=client._cluster.supervisor_address,
)
await asyncio.gather(*[autoscaler_ref.request_worker() for _ in range(2)])
df = md.DataFrame(mt.random.rand(100, 4, chunk_size=2), columns=list("abcd"))
print(df.execute())
assert await autoscaler_ref.get_dynamic_worker_nums() > 1
num_chunks, chunk_size = 20, 4
df = md.DataFrame(
mt.random.rand(num_chunks * chunk_size, 4, chunk_size=chunk_size),
columns=list("abcd"),
)
latch_actor = ray.remote(CountDownLatch).remote(1)
pid = os.getpid()

def f(pdf, latch):
if os.getpid() != pid:
# type inference will call this function too
ray.get(latch.wait.remote())
return pdf

df = df.map_chunk(
f,
args=(latch_actor,),
)
info = df.execute(wait=False)
while await autoscaler_ref.get_dynamic_worker_nums() <= 1:
logger.info("Waiting workers to be created.")
await asyncio.sleep(1)
await latch_actor.count_down.remote()
await info
assert info.exception() is None
assert info.progress() == 1
logger.info("df execute succeed.")

while await autoscaler_ref.get_dynamic_worker_nums() > 1:
dynamic_workers = await autoscaler_ref.get_dynamic_workers()
print(f"Waiting workers {dynamic_workers} to be released.")
logger.info("Waiting workers %s to be released.", dynamic_workers)
await asyncio.sleep(1)
# Test data on node of released worker can still be fetched
pd_df = df.to_pandas()
groupby_sum_df = df.rechunk(40).groupby("a").sum()
print(groupby_sum_df.execute())
pd_df = df.fetch()
groupby_sum_df = df.rechunk(chunk_size * 2).groupby("a").sum()
logger.info(groupby_sum_df.execute())
while await autoscaler_ref.get_dynamic_worker_nums() > 1:
dynamic_workers = await autoscaler_ref.get_dynamic_workers()
print(f"Waiting workers {dynamic_workers} to be released.")
logger.info(f"Waiting workers %s to be released.", dynamic_workers)
await asyncio.sleep(1)
assert df.to_pandas().to_dict() == pd_df.to_dict()
assert (
groupby_sum_df.to_pandas().to_dict() == pd_df.groupby("a").sum().to_dict()
)


class CountDownLatch:
def __init__(self, cnt):
self.cnt = cnt

def count_down(self):
self.cnt -= 1

def get_count(self):
return self.cnt

async def wait(self):
while self.cnt != 0:
await asyncio.sleep(0.01)
11 changes: 9 additions & 2 deletions mars/oscar/backends/ray/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ async def start(self):
)
self._set_ray_server(self._actor_pool)
self._state = RayPoolState.POOL_READY
logger.info("Started main pool %s with %s processes.", address, n_process)

async def mark_service_ready(self):
results = []
Expand Down Expand Up @@ -336,20 +337,25 @@ def set_actor_pool_config(self, actor_pool_config):
self._actor_pool_config = actor_pool_config

async def start(self):
logger.info("Start to init sub pool.")
# create mars pool outside the constructor is to avoid ray actor creation failed.
# ray can't get the creation exception.
main_pool_address, process_index = self._args
logger.info(
"Start to init sub pool %s for main pool %s.",
process_index,
main_pool_address,
)
main_pool = ray.get_actor(main_pool_address)
self._check_alive_task = asyncio.create_task(
self.check_main_pool_alive(main_pool)
)
if self._actor_pool_config is None:
self._actor_pool_config = await main_pool.actor_pool.remote("_config")
pool_config = self._actor_pool_config.get_pool_config(process_index)
sub_pool_address = pool_config["external_address"]
assert (
self._state == RayPoolState.INIT
), f"The pool {pool_config['external_address']} is already started, current state is {self._state}"
), f"The pool {sub_pool_address} is already started, current state is {self._state}"
env = pool_config["env"]
if env: # pragma: no cover
os.environ.update(env)
Expand All @@ -363,6 +369,7 @@ async def start(self):
await self._actor_pool.start()
asyncio.create_task(self._actor_pool.join())
self._state = RayPoolState.POOL_READY
logger.info("Started sub pool %s.", sub_pool_address)

def mark_service_ready(self):
self._state = RayPoolState.SERVICE_READY
Expand Down
29 changes: 19 additions & 10 deletions mars/services/scheduling/supervisor/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ....lib.aio import alru_cache
from ...cluster.api import ClusterAPI
from ...cluster.core import NodeRole, NodeStatus
from ..errors import NoAvailableBand

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,7 +132,10 @@ async def release_worker(address):
self._dynamic_workers.remove(address)
logger.info("Released worker %s.", address)

await asyncio.gather(*[release_worker(address) for address in addresses])
# Release workers one by one to ensure others workers which the current is moving data to
# is not being releasing.
for address in addresses:
await release_worker(address)

def get_dynamic_workers(self) -> Set[str]:
return self._dynamic_workers
Expand Down Expand Up @@ -214,7 +218,7 @@ async def _select_target_band(
if (b[1] == band[1] and b != band and b not in excluded_bands)
)
if not bands: # pragma: no cover
raise RuntimeError(
raise NoAvailableBand(
f"No bands to migrate data to, "
f"all available bands is {all_bands}, "
f"current band is {band}, "
Expand Down Expand Up @@ -400,14 +404,19 @@ async def _scale_in(self):
worker_addresses,
idle_bands,
)
# Release workers one by one to ensure others workers which the current is moving data to
# is not being releasing.
await self._autoscaler.release_workers(worker_addresses)
logger.info(
"Finished offline workers %s in %.4f seconds",
worker_addresses,
time.time() - start_time,
)
try:
await self._autoscaler.release_workers(worker_addresses)
logger.info(
"Finished offline workers %s in %.4f seconds",
worker_addresses,
time.time() - start_time,
)
except NoAvailableBand as e: # pragma: no cover
logger.warning(
"No enough bands, offline workers %s failed with exception %s.",
worker_addresses,
e,
)

async def stop(self):
self._task.cancel()
Expand Down

0 comments on commit 18a3af8

Please sign in to comment.