Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use batched request to apply for slots #2601

Merged
merged 1 commit into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion mars/dataframe/contrib/raydataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __getstate__(self):

# The default __setstate__ will update _MLDataset's __dict__;


else:
_Dataset = None

Expand Down
1 change: 0 additions & 1 deletion mars/dataframe/contrib/raydataset/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def __getstate__(self):

# The default __setstate__ will update _MLDataset's __dict__;


else:
_MLDataset = None

Expand Down
1 change: 1 addition & 0 deletions mars/services/scheduling/supervisor/globalslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def __pre_destroy__(self):
async def refresh_bands(self):
self._band_total_slots = await self._cluster_api.get_all_bands()

@mo.extensible
async def apply_subtask_slots(
self,
band: BandType,
Expand Down
37 changes: 32 additions & 5 deletions mars/services/scheduling/supervisor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
submit_aio_tasks = []
manager_ref = await self._get_manager_ref()

apply_delays = []
submit_items_list = []
submitted_bands = []

for band in bands:
band_limit = limit or self._band_slot_nums[band]
task_queue = self._band_queues[band]
Expand All @@ -181,17 +185,40 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
subtask_ids = list(submit_items)
if not subtask_ids:
continue

submitted_bands.append(band)
submit_items_list.append(submit_items)

# todo it is possible to provide slot data with more accuracy
subtask_slots = [1] * len(subtask_ids)

apply_delays.append(
self._slots_ref.apply_subtask_slots.delay(
band, self._session_id, subtask_ids, subtask_slots
)
)

async with redirect_subtask_errors(
self,
[
item.subtask
for submit_items in submit_items_list
for item in submit_items.values()
],
):
submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch(
*apply_delays
)

for band, submit_items, submitted_ids in zip(
submitted_bands, submit_items_list, submitted_ids_list
):
subtask_ids = list(submit_items)
task_queue = self._band_queues[band]

async with redirect_subtask_errors(
self, [item.subtask for item in submit_items.values()]
):
submitted_ids = set(
await self._slots_ref.apply_subtask_slots(
band, self._session_id, subtask_ids, subtask_slots
)
)
non_submitted_ids = [k for k in submit_items if k not in submitted_ids]
if submitted_ids:
for stid in subtask_ids:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ async def create(cls, address: str, **kw):


class MockSlotsActor(mo.Actor):
@mo.extensible
def apply_subtask_slots(
self,
band: Tuple,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self):
def set_capacity(self, capacity: int):
self._capacity = capacity

@mo.extensible
def apply_subtask_slots(
self,
band: Tuple,
Expand Down
1 change: 0 additions & 1 deletion mars/storage/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __del__(self):
if os.name != "nt" and fd >= 0:
os.close(fd)


except ImportError: # pragma: no cover
# allow shared_memory package to be absent
SharedMemory = SharedMemoryForRead = None
Expand Down