Skip to content

Commit

Permalink
PullRequest: 145 fix set_subtask_result race condition
Browse files Browse the repository at this point in the history
Merge branch fix_set_subtask_result_race_condition of git@gitlab.alipay-inc.com:ray-project/mars.git into master
https://code.alipay.com/ray-project/mars/pull_requests/145

Signed-off-by: 天苍 <yiming.yym@antgroup.com>
Signed-off-by: 留宝 <po.lb@antgroup.com>

* fix set_subtask_result race condition

* fix set_subtask_result race condition
  • Loading branch information
chaokunyang committed Mar 8, 2022
1 parent 9b3cc49 commit fde4670
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
5 changes: 5 additions & 0 deletions mars/services/task/supervisor/processor.py
Expand Up @@ -887,6 +887,11 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
stage_processor.subtask_snapshots.get(subtask)
)
if subtask_result.status.is_done:
# update stage_processor.subtask_results to avoid concurrent set_subtask_result
# since we release lock when `_decref_input_subtasks`.
stage_processor.subtask_results[subtask] = subtask_result.update(
stage_processor.subtask_results.get(subtask)
)
try:
# Since every worker will call supervisor to set subtask result,
# we need to release actor lock to make `decref_chunks` parallel to avoid blocking
Expand Down
2 changes: 1 addition & 1 deletion mars/services/task/supervisor/stage.py
Expand Up @@ -125,7 +125,7 @@ def _schedule_done(self):

async def set_subtask_result(self, result: SubtaskResult):
subtask = self.subtask_id_to_subtask[result.subtask_id]
self.subtask_results[subtask] = result.update(self.subtask_results.get(subtask))
# update subtask_results in `TaskProcessorActor.set_subtask_result`
self._submitted_subtask_ids.difference_update([result.subtask_id])

all_done = len(self.subtask_results) == len(self.subtask_graph)
Expand Down

0 comments on commit fde4670

Please sign in to comment.