Skip to content

Commit

Permalink
Assign bands given devices of subtasks (#2276)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Aug 2, 2021
1 parent 0f3c8c9 commit 92cd6c8
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 88 deletions.
4 changes: 4 additions & 0 deletions mars/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ def serialize(self, obj: Serializable, context: Dict):


BaseSerializer.register(Base)


class MarsError(Exception):
pass
4 changes: 3 additions & 1 deletion mars/deploy/oscar/tests/test_fault_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
from mars.remote import spawn
from mars.deploy.oscar.local import new_cluster
from mars.deploy.oscar.session import get_default_async_session
from mars.oscar.errors import FaultInjectionError, FaultInjectionUnhandledError, ServerClosed
from mars.oscar.errors import ServerClosed
from mars.services.tests.fault_injection_manager import (
FaultType,
AbstractFaultInjectionManager,
ExtraConfigKey,
FaultInjectionError,
FaultInjectionUnhandledError
)

CONFIG_FILE = os.path.join(
Expand Down
4 changes: 3 additions & 1 deletion mars/oscar/backends/communication/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ....core.base import MarsError

class ChannelClosed(Exception):

class ChannelClosed(MarsError):
pass
12 changes: 1 addition & 11 deletions mars/oscar/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


class MarsError(Exception):
pass
from mars.core.base import MarsError


class ActorPoolNotStarted(MarsError):
Expand Down Expand Up @@ -46,11 +44,3 @@ class CannotCancelTask(MarsError):
class Return(MarsError):
def __init__(self, value):
self.value = value


class FaultInjectionError(MarsError):
pass


class FaultInjectionUnhandledError(Exception):
pass
4 changes: 3 additions & 1 deletion mars/services/lifecycle/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ...core.base import MarsError

class TileableNotTracked(Exception):

class TileableNotTracked(MarsError):
pass
23 changes: 23 additions & 0 deletions mars/services/scheduling/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ...core.base import MarsError


class NoMatchingSlots(MarsError):
def __init__(self, slot_prefix):
self.slot_prefix = slot_prefix

def __str__(self):
return str(self.slot_prefix)
162 changes: 107 additions & 55 deletions mars/services/scheduling/supervisor/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@
# limitations under the License.

import asyncio
import itertools
import random
from collections import defaultdict
from typing import List, Dict, Tuple
from typing import Dict, List

from .... import oscar as mo
from ....core.operand import Fetch, FetchShuffle
from ....typing import BandType
from ...core import NodeRole
from ...subtask import Subtask
from ..errors import NoMatchingSlots


class AssignerActor(mo.Actor):
_bands: List[BandType]

@classmethod
def gen_uid(cls, session_id: str):
return f'{session_id}_assigner'
Expand All @@ -36,6 +41,8 @@ def __init__(self, session_id: str):
self._meta_api = None

self._bands = []
self._address_to_bands = dict()
self._device_type_to_bands = dict()
self._band_watch_task = None

async def __post_create__(self):
Expand All @@ -47,18 +54,39 @@ async def __post_create__(self):

async def watch_bands():
async for bands in self._cluster_api.watch_all_bands(NodeRole.WORKER):
self._bands = list(bands)
self._update_bands(list(bands))

self._band_watch_task = asyncio.create_task(watch_bands())

async def __pre_destroy__(self):
if self._band_watch_task is not None: # pragma: no branch
self._band_watch_task.cancel()

def _update_bands(self, bands: List[BandType]):
self._bands = bands

grouped_bands = itertools.groupby(sorted(self._bands), key=lambda b: b[0])
self._address_to_bands = {k: list(v) for k, v in grouped_bands}

grouped_bands = itertools.groupby(
sorted(('cpu' if b[1].startswith('numa') else 'gpu', b) for b in bands),
key=lambda tp: tp[0]
)
self._device_type_to_bands = {k: [v[1] for v in tps] for k, tps in grouped_bands}

@staticmethod
def _get_device_bands(bands: List[BandType], is_gpu: bool):
band_prefix = 'numa' if not is_gpu else 'gpu'
filtered_bands = [band for band in bands if band[1].startswith(band_prefix)]
if not filtered_bands:
raise NoMatchingSlots('gpu' if is_gpu else 'cpu')
return filtered_bands

async def assign_subtasks(self, subtasks: List[Subtask]):
inp_keys = set()
selected_bands = dict()
for subtask in subtasks:
is_gpu = any(c.op.gpu for c in subtask.chunk_graph.result_chunks)
if subtask.expect_bands:
if all(expect_band in self._bands
for expect_band in subtask.expect_bands):
Expand All @@ -71,17 +99,17 @@ async def assign_subtasks(self, subtasks: List[Subtask]):
if expect_band in self._bands]
# fill in if all expected bands are unready
if not expect_available_bands:
expect_available_bands = [self.reassign_band()]
expect_available_bands = [self.get_random_band(is_gpu)]
selected_bands[subtask.subtask_id] = expect_available_bands
continue
for indep_chunk in subtask.chunk_graph.iter_indep():
if isinstance(indep_chunk.op, Fetch):
inp_keys.add(indep_chunk.key)
elif isinstance(indep_chunk.op, FetchShuffle):
if not self._bands:
self._bands = list(await self._cluster_api.get_all_bands(
NodeRole.WORKER))
selected_bands[subtask.subtask_id] = [self.reassign_band()]
self._update_bands(list(await self._cluster_api.get_all_bands(
NodeRole.WORKER)))
selected_bands[subtask.subtask_id] = [self.get_random_band(is_gpu)]
break

fields = ['store_size', 'bands']
Expand All @@ -93,6 +121,10 @@ async def assign_subtasks(self, subtasks: List[Subtask]):
inp_metas = dict(zip(inp_keys, metas))
assigns = []
for subtask in subtasks:
is_gpu = any(c.op.gpu for c in subtask.chunk_graph.result_chunks)
band_prefix = 'numa' if not is_gpu else 'gpu'
filtered_bands = self._get_device_bands(self._bands, is_gpu)

if subtask.subtask_id in selected_bands:
bands = selected_bands[subtask.subtask_id]
else:
Expand All @@ -102,8 +134,13 @@ async def assign_subtasks(self, subtasks: List[Subtask]):
continue
meta = inp_metas[inp.key]
for band in meta['bands']:
if band not in self._bands:
band = self.reassign_band()
if not band[1].startswith(band_prefix):
sel_bands = [b for b in self._address_to_bands[band[0]]
if b[1].startswith(band_prefix)]
if sel_bands:
band = (band[0], random.choice(sel_bands))
if band not in filtered_bands:
band = self.get_random_band(is_gpu)
band_sizes[band] += meta['store_size']
bands = []
max_size = -1
Expand All @@ -116,52 +153,67 @@ async def assign_subtasks(self, subtasks: List[Subtask]):
assigns.append(random.choice(bands))
return assigns

def reassign_band(self):
assert self._bands
return random.choice(self._bands)

async def reassign_subtasks(self, band_num_queued_subtasks: Dict[Tuple, int]) -> Dict[Tuple, int]:
num_used_bands = len(band_num_queued_subtasks.keys())
if num_used_bands == 1:
[(band, length)] = band_num_queued_subtasks.items()
if length == 0:
return {band: 0}
# no need to balance when there's only one band initially
if len(self._bands) == 1 and band == self._bands[0]:
return {band: 0}
# unready bands recorded in band_num_queued_subtasks, some of them may hold 0 subtasks
unready_bands = list(set(band_num_queued_subtasks.keys()) - set(self._bands))
# ready bands not recorded in band_num_queued_subtasks, all of them hold 0 subtasks
new_ready_bands = list(set(self._bands) - set(band_num_queued_subtasks.keys()))
# when there are new ready bands, make all bands hold same amount of subtasks
# when there are no new ready bands now, move out subtasks left on them
if not new_ready_bands and unready_bands:
band_num_queued_subtasks = {k: band_num_queued_subtasks[k] for k in unready_bands}
# approximate total of subtasks moving to each ready band
num_all_subtasks = sum(band_num_queued_subtasks.values())
mean = int(num_all_subtasks / len(self._bands))
# all_bands (namely) includes:
# a. ready bands recorded in band_num_queued_subtasks
# b. ready bands not recorded in band_num_queued_subtasks
# c. unready bands recorded in band_num_queued_subtasks
# a. + b. = self._bands, a. + c. = bands in band_num_queued_subtasks
all_bands = list(set(self._bands) | set(band_num_queued_subtasks.keys()))
# calculate the differential steps of moving subtasks
# move < 0 means subtasks should move out and vice versa
# unready bands no longer hold subtasks
# assuming bands not recorded in band_num_queued_subtasks hold 0 subtasks
def get_random_band(self, is_gpu: bool):
avail_bands = self._get_device_bands(self._bands, is_gpu)
return random.choice(avail_bands)

async def reassign_subtasks(self, band_to_queued_num: Dict[BandType, int]) \
-> Dict[BandType, int]:
move_queued_subtasks = {}
for band in all_bands:
if band in self._bands:
move_queued_subtasks[band] = mean - band_num_queued_subtasks.get(band, 0)
else:
move_queued_subtasks[band] = -band_num_queued_subtasks.get(band, 0)
# ensure the balance of moving in and out
total_move = sum(move_queued_subtasks.values())
# int() is going to be closer to zero, so `mean` is no more than actual mean value
# total_move = mean * len(self._bands) - num_all_subtasks
# <= actual_mean * len(self._bands) - num_all_subtasks = 0
assert total_move <= 0
if total_move != 0:
move_queued_subtasks[self.reassign_band()] -= total_move
for is_gpu in (False, True):
band_name_prefix = 'numa' if not is_gpu else 'gpu'

filtered_bands = [band for band in self._bands if band[1].startswith(band_name_prefix)]
filtered_band_to_queued_num = {k: v for k, v in band_to_queued_num.items()
if k[1].startswith(band_name_prefix)}

if not filtered_bands:
continue

num_used_bands = len(filtered_band_to_queued_num.keys())
if num_used_bands == 1:
[(band, length)] = filtered_band_to_queued_num.items()
if length == 0:
move_queued_subtasks.update({band: 0})
continue
# no need to balance when there's only one band initially
if len(filtered_bands) == 1 and band == filtered_bands[0]:
move_queued_subtasks.update({band: 0})
continue
# unready bands recorded in band_num_queued_subtasks, some of them may hold 0 subtasks
unready_bands = list(set(filtered_band_to_queued_num.keys()) - set(filtered_bands))
# ready bands not recorded in band_num_queued_subtasks, all of them hold 0 subtasks
new_ready_bands = list(set(filtered_bands) - set(filtered_band_to_queued_num.keys()))
# when there are new ready bands, make all bands hold same amount of subtasks
# when there are no new ready bands now, move out subtasks left on them
if not new_ready_bands and unready_bands:
filtered_band_to_queued_num = {k: filtered_band_to_queued_num[k] for k in unready_bands}
# approximate total of subtasks moving to each ready band
num_all_subtasks = sum(filtered_band_to_queued_num.values())
mean = int(num_all_subtasks / len(filtered_bands))
# all_bands (namely) includes:
# a. ready bands recorded in band_num_queued_subtasks
# b. ready bands not recorded in band_num_queued_subtasks
# c. unready bands recorded in band_num_queued_subtasks
# a. + b. = self._bands, a. + c. = bands in band_num_queued_subtasks
all_bands = list(set(filtered_bands) | set(filtered_band_to_queued_num.keys()))
# calculate the differential steps of moving subtasks
# move < 0 means subtasks should move out and vice versa
# unready bands no longer hold subtasks
# assuming bands not recorded in band_num_queued_subtasks hold 0 subtasks
band_move_nums = {}
for band in all_bands:
if band in filtered_bands:
band_move_nums[band] = mean - filtered_band_to_queued_num.get(band, 0)
else:
band_move_nums[band] = -filtered_band_to_queued_num.get(band, 0)
# ensure the balance of moving in and out
total_move = sum(band_move_nums.values())
# int() is going to be closer to zero, so `mean` is no more than actual mean value
# total_move = mean * len(self._bands) - num_all_subtasks
# <= actual_mean * len(self._bands) - num_all_subtasks = 0
assert total_move <= 0
if total_move != 0:
band_move_nums[self.get_random_band(False)] -= total_move
move_queued_subtasks.update(band_move_nums)
return dict(sorted(move_queued_subtasks.items(), key=lambda item: item[1]))
Loading

0 comments on commit 92cd6c8

Please sign in to comment.