Skip to content

Commit

Permalink
[BACKPORT] Fix race condition of set_subtask_result (#2784) (#2819)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Mar 14, 2022
1 parent a6413b4 commit 10d7c60
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 3 deletions.
4 changes: 3 additions & 1 deletion mars/services/scheduling/worker/workerslot.py
Expand Up @@ -166,7 +166,9 @@ def release_free_slot(self, slot_id: int, session_stid: Tuple[str, str]):
f"the releasing session_stid: {session_stid}"
)
acquired_slot_id = self._session_stid_to_slot.pop(acquired_session_stid)
assert acquired_slot_id == slot_id
assert (
acquired_slot_id == slot_id
), f"{acquired_session_stid}: acquired_slot_id {acquired_slot_id} != slot_id {slot_id}"

logger.debug("Slot %d released", slot_id)

Expand Down
5 changes: 5 additions & 0 deletions mars/services/task/supervisor/processor.py
Expand Up @@ -777,6 +777,11 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
stage_processor.subtask_snapshots.get(subtask)
)
if subtask_result.status.is_done:
# update stage_processor.subtask_results to avoid concurrent set_subtask_result
# since we release lock when `_decref_input_subtasks`.
stage_processor.subtask_results[subtask] = subtask_result.update(
stage_processor.subtask_results.get(subtask)
)
try:
# Since every worker will call supervisor to set subtask result,
# we need to release actor lock to make `decref_chunks` parallel to avoid blocking
Expand Down
7 changes: 6 additions & 1 deletion mars/services/task/supervisor/stage.py
Expand Up @@ -88,6 +88,11 @@ def is_cancelled(self):
return self._cancelled.is_set()

async def _schedule_subtasks(self, subtasks: List[Subtask]):
subtasks = [
subtask
for subtask in subtasks
if subtask.subtask_id not in self._submitted_subtask_ids
]
if not subtasks:
return
self._submitted_subtask_ids.update(subtask.subtask_id for subtask in subtasks)
Expand Down Expand Up @@ -117,7 +122,7 @@ def _schedule_done(self):

async def set_subtask_result(self, result: SubtaskResult):
subtask = self.subtask_id_to_subtask[result.subtask_id]
self.subtask_results[subtask] = result.update(self.subtask_results.get(subtask))
# update subtask_results in `TaskProcessorActor.set_subtask_result`
self._submitted_subtask_ids.difference_update([result.subtask_id])

all_done = len(self.subtask_results) == len(self.subtask_graph)
Expand Down
2 changes: 1 addition & 1 deletion mars/tensor/datastore/to_vineyard.py
Expand Up @@ -109,7 +109,7 @@ def execute(cls, ctx, op):
needs_put = True
if needs_put:
tensor_id = client.put(
ctx[op.inputs[0].key], partition_index=op.inputs[0].index
np.asarray(ctx[op.inputs[0].key]), partition_index=op.inputs[0].index
)
else: # pragma: no cover
meta = client.get_meta(tensor_id)
Expand Down

0 comments on commit 10d7c60

Please sign in to comment.