Skip to content

Commit

Permalink
fix repeat deletion of kv item (#73)
Browse files Browse the repository at this point in the history
Fix repeat deletion of kv item
  • Loading branch information
wjsi authored and qinxuye committed Dec 27, 2018
1 parent 0c4d8aa commit 52aa0a2
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions mars/scheduler/chunkmeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def delete_meta(self, session_id, chunk_key):
query_key = (session_id, chunk_key)
try:
del self._meta_store[query_key]
if self._kv_store_ref is not None:
self._kv_store_ref.delete('/sessions/%s/chunks/%s' % (session_id, chunk_key),
recursive=True, _tell=True, _wait=False)
except KeyError:
pass
try:
Expand All @@ -273,15 +276,10 @@ def delete_meta(self, session_id, chunk_key):
pass

# broadcast deletion into pre-determined destinations
futures = []
if query_key in self._meta_broadcasts:
for dest in self._meta_broadcasts[query_key]:
futures.append(self.ctx.actor_ref(self.default_name(), address=dest) \
.delete_meta(session_id, chunk_key, _wait=False, _tell=True))
if self._kv_store_ref is not None:
futures.append(self._kv_store_ref.delete('/sessions/%s/chunks/%s' % (session_id, chunk_key),
recursive=True, _tell=True, _wait=False))
[f.result() for f in futures]
self.ctx.actor_ref(self.default_name(), address=dest) \
.delete_meta(session_id, chunk_key, _wait=False, _tell=True)

def batch_delete_meta(self, session_id, chunk_keys):
"""
Expand Down

0 comments on commit 52aa0a2

Please sign in to comment.