Skip to content

Commit

Permalink
Add a lock in the coordinator's RunStep as well.
Browse files Browse the repository at this point in the history
See also: alibaba#1445

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow committed Apr 19, 2022
1 parent 0479c0a commit 6e425ec
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def __init__(self, launcher, dangling_timeout_seconds, log_level="INFO"):
)
self._dangling_detecting_timer.start()

# a lock that protects the coordinator
self._lock = threading.Lock()

atexit.register(self._cleanup)

def __del__(self):
Expand Down Expand Up @@ -593,8 +596,9 @@ def run_on_coordinator(
return response_head, response_bodies

def RunStep(self, request_iterator, context):
for response in self.RunStepWrapped(request_iterator, context):
yield response
with self._lock:
for response in self.RunStepWrapped(request_iterator, context):
yield response

def _RunStep(self, request_iterator, context):
# split dag
Expand Down

0 comments on commit 6e425ec

Please sign in to comment.