Skip to content

Commit

Permalink
Remove broadcast entries of deleted items (#76)
Browse files Browse the repository at this point in the history
* Remove broadcast entries of deleted items

* add coverage excludes
  • Loading branch information
wjsi authored and qinxuye committed Dec 28, 2018
1 parent 68c04d3 commit 23d5e94
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 3 deletions.
7 changes: 7 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ omit =
mars/lib/uhashring/*
mars/serialize/protos/*
*/tests/*

[report]
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
7 changes: 7 additions & 0 deletions .coveragerc-tensor
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ omit =
mars/lib/uhashring/*
mars/serialize/protos/*
*/tests/*

[report]
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
20 changes: 17 additions & 3 deletions mars/scheduler/chunkmeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ def set_chunk_broadcasts(self, session_id, chunk_key, broadcast_dests):
self._meta_broadcasts[(session_id, chunk_key)] = \
[d for d in broadcast_dests if d != self.address]

def get_chunk_broadcasts(self, session_id, chunk_key):
"""
Get chunk broadcast addresses, for test only
:param session_id: session id
:param chunk_key: chunk key
"""
return self._meta_broadcasts.get((session_id, chunk_key))

def set_chunk_meta(self, session_id, chunk_key, size=None, workers=None):
"""
Update chunk meta in current storage
Expand Down Expand Up @@ -280,6 +288,7 @@ def delete_meta(self, session_id, chunk_key):
for dest in self._meta_broadcasts[query_key]:
self.ctx.actor_ref(self.default_name(), address=dest) \
.delete_meta(session_id, chunk_key, _wait=False, _tell=True)
del self._meta_broadcasts[query_key]

def batch_delete_meta(self, session_id, chunk_keys):
"""
Expand All @@ -296,11 +305,16 @@ def remove_workers_in_session(self, session_id, workers):
:return: keys of lost chunks
"""
logger.debug('Removing workers %r from store', workers)
chunks = set()
removed_chunks = set()
for w in workers:
self._meta_cache.remove_worker_keys(w, lambda k: k[0] == session_id)
chunks.update(self._meta_store.remove_worker_keys(w, lambda k: k[0] == session_id))
return [k[1] for k in chunks]
removed_chunks.update(self._meta_store.remove_worker_keys(w, lambda k: k[0] == session_id))
for c in removed_chunks:
try:
del self._meta_broadcasts[c]
except KeyError:
pass
return [k[1] for k in removed_chunks]


class ChunkMetaActor(SchedulerActor):
Expand Down
16 changes: 16 additions & 0 deletions mars/scheduler/tests/test_chunkmeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def _mock_get_scheduler(key):

client = new_client()
ref1 = client.actor_ref(ChunkMetaActor.default_name(), address=endpoints[0])
ref2 = client.actor_ref(ChunkMetaActor.default_name(), address=endpoints[0])
local_ref1 = client.actor_ref(LocalChunkMetaActor.default_name(), address=endpoints[0])
local_ref2 = client.actor_ref(LocalChunkMetaActor.default_name(), address=endpoints[1])

Expand All @@ -201,13 +202,28 @@ def _mock_get_scheduler(key):

ref1.set_chunk_broadcasts(session_id, key1, [endpoints[1]])
ref1.set_chunk_size(session_id, key1, 512)
ref1.add_worker(session_id, key1, 'abc')
ref2.set_chunk_broadcasts(session_id, key2, [endpoints[0]])
ref2.set_chunk_size(session_id, key2, 512)
ref2.add_worker(session_id, key2, 'def')
pool2.sleep(0.1)

self.assertEqual(local_ref1.get_chunk_meta(session_id, key1).chunk_size, 512)
self.assertEqual(local_ref1.get_chunk_broadcasts(session_id, key1), [endpoints[1]])
self.assertEqual(local_ref2.get_chunk_meta(session_id, key1).chunk_size, 512)
self.assertEqual(local_ref2.get_chunk_broadcasts(session_id, key2), [endpoints[0]])

ref1.delete_meta(session_id, key1)
pool2.sleep(0.1)

self.assertIsNone(local_ref1.get_chunk_meta(session_id, key1))
self.assertIsNone(local_ref2.get_chunk_meta(session_id, key1))
self.assertIsNone(local_ref1.get_chunk_broadcasts(session_id, key1))

local_ref1.remove_workers_in_session(session_id, ['def'])
local_ref2.remove_workers_in_session(session_id, ['def'])
pool2.sleep(0.1)

self.assertIsNone(local_ref1.get_chunk_meta(session_id, key2))
self.assertIsNone(local_ref2.get_chunk_meta(session_id, key2))
self.assertIsNone(local_ref2.get_chunk_broadcasts(session_id, key2))

0 comments on commit 23d5e94

Please sign in to comment.