Skip to content

Commit

Permalink
Use ChunkMetaActor to replace KVStoreActor (#37)
Browse files Browse the repository at this point in the history
* Use ChunkMetaActor to replace KVStoreActor

* add worker-> chunks mapping and delete worker support

* skip some must-fail tests under windows

* do code fix
  • Loading branch information
wjsi authored and qinxuye committed Dec 27, 2018
1 parent 43c78eb commit 0c4d8aa
Show file tree
Hide file tree
Showing 30 changed files with 1,049 additions and 379 deletions.
26 changes: 16 additions & 10 deletions mars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .actors import new_client
from .cluster_info import ClusterInfoActor
from .node_info import NodeInfoActor
from .scheduler import SessionActor, GraphActor, KVStoreActor
from .scheduler import SessionActor, GraphActor, GraphMetaActor, ResourceActor
from .scheduler.session import SessionManagerActor
from .scheduler.graph import ResultReceiverActor

Expand All @@ -29,7 +29,6 @@ def __init__(self, scheduler_ip):
self.actor_client = new_client()
self.cluster_info = self.actor_client.actor_ref(
ClusterInfoActor.default_name(), address=scheduler_ip)
self.kv_store = self.get_actor_ref(KVStoreActor.default_name())
self.session_manager = self.get_actor_ref(SessionManagerActor.default_name())

def get_actor_ref(self, uid):
Expand All @@ -47,9 +46,8 @@ def get_schedulers_info(self):

def count_workers(self):
try:
worker_info = self.kv_store.read('/workers/meta')
workers_num = len(worker_info.children)
return workers_num
uid = ResourceActor.default_name()
return self.get_actor_ref(uid).get_worker_count()
except KeyError:
return 0

Expand All @@ -70,16 +68,24 @@ def delete_graph(self, session_id, graph_key):
graph_ref.destroy()

def stop_graph(self, session_id, graph_key):
from .scheduler import GraphState
graph_meta_uid = GraphMetaActor.gen_name(session_id, graph_key)
self.get_actor_ref(graph_meta_uid).set_state(GraphState.CANCELLING)

graph_uid = GraphActor.gen_name(session_id, graph_key)
graph_ref = self.get_actor_ref(graph_uid)
graph_ref.stop_graph()

def get_graph_state(self, session_id, graph_key):
from .scheduler.utils import GraphState

state_obj = self.kv_store.read(
'/sessions/%s/graph/%s/state' % (session_id, graph_key), silent=True)
state = state_obj.value if state_obj else 'preparing'
from .scheduler import GraphState

graph_meta_uid = GraphMetaActor.gen_name(session_id, graph_key)
graph_meta_ref = self.get_actor_ref(graph_meta_uid)
if self.actor_client.has_actor(graph_meta_ref):
state_obj = graph_meta_ref.get_state()
state = state_obj.value if state_obj else 'preparing'
else:
state = 'preparing'
state = GraphState(state.lower())
return state

Expand Down
2 changes: 1 addition & 1 deletion mars/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def _exitfunc(cls):
gc.enable()


__all__ = ['sys', 'builtins', 'logging.config', 'OrderedDict', 'dictconfig', 'suppress',
__all__ = ['PY27', 'sys', 'builtins', 'logging.config', 'OrderedDict', 'dictconfig', 'suppress',
'reduce', 'reload_module', 'Queue', 'PriorityQueue', 'Empty', 'ElementTree', 'ElementTreeParseError',
'urlretrieve', 'pickle', 'urlencode', 'urlparse', 'unquote', 'quote', 'quote_plus', 'parse_qsl',
'Enum', 'ConfigParser', 'decimal', 'Decimal', 'DECIMAL_TYPES', 'FixedOffset', 'utc', 'finalize',
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/local/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def start_service(self):
self._pool = create_actor_pool(self._endpoint, n_process, distributor=distributor)

# start scheduler first
self._scheduler_service.start(self._endpoint, self._pool)
self._scheduler_service.start(self._endpoint, None, self._pool)

# start worker next
self._worker_service.start_local(self._endpoint, self._pool, self._scheduler_n_process)
Expand Down
4 changes: 4 additions & 0 deletions mars/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class SpillNotConfigured(MarsError):
pass


class GraphNotExists(MarsError):
pass


class PromiseTimeout(MarsError):
pass

Expand Down
4 changes: 3 additions & 1 deletion mars/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .graph import GraphActor
from .chunkmeta import ChunkMetaActor
from .graph import GraphActor, GraphMetaActor
from .operand import OperandActor
from .assigner import AssignerActor
from .resource import ResourceActor
from .session import SessionActor, SessionManagerActor
from .kvstore import KVStoreActor
from .utils import GraphState, OperandState
2 changes: 1 addition & 1 deletion mars/scheduler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def create_pool(self, *args, **kwargs):
return super(SchedulerApplication, self).create_pool(*args, **kwargs)

def start_service(self):
super(SchedulerApplication, self).start(self.endpoint, self.pool)
super(SchedulerApplication, self).start(self.endpoint, self.args.schedulers, self.pool)

def stop_service(self):
super(SchedulerApplication, self).stop(self.pool)
Expand Down
47 changes: 14 additions & 33 deletions mars/scheduler/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
import copy
import heapq
import logging
import os
import random
import time
import os
from collections import defaultdict

from .. import promise
from ..config import options
from ..utils import log_unhandled
from .chunkmeta import ChunkMetaActor
from .resource import ResourceActor
from .kvstore import KVStoreActor
from .utils import SchedulerActor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -205,7 +205,7 @@ def __init__(self, assigner_ref):
self._cluster_info_ref = None
self._assigner_ref = assigner_ref
self._resource_actor_ref = None
self._kv_store_ref = None
self._chunk_meta_ref = None

self._sufficient_operands = set()
self._operand_sufficient_time = dict()
Expand All @@ -216,7 +216,7 @@ def post_create(self):
self.set_cluster_info_ref()
self._assigner_ref = self.ctx.actor_ref(self._assigner_ref)
self._resource_actor_ref = self.get_actor_ref(ResourceActor.default_name())
self._kv_store_ref = self.get_actor_ref(KVStoreActor.default_name())
self._chunk_meta_ref = self.ctx.actor_ref(ChunkMetaActor.default_name())

self.periodical_allocate()

Expand Down Expand Up @@ -282,16 +282,15 @@ def _allocate_resource(self, session_id, op_key, op_info, target_worker=None, re

reject_workers = reject_workers or set()

op_path = '/sessions/%s/operands/%s' % (session_id, op_key)

op_io_meta = op_info['io_meta']
input_chunk_keys = op_io_meta['input_chunks']
input_sizes = dict(zip(input_chunk_keys, self._get_multiple_chunk_size(session_id, input_chunk_keys)))
metas = self._get_chunks_meta(session_id, input_chunk_keys)
input_sizes = dict((k, meta.chunk_size) for k, meta in metas.items())
output_size = op_info['output_size']

if target_worker is None:
op_name = op_info['op_name']
who_has = dict(zip(input_chunk_keys, self._get_multiple_who_has(session_id, input_chunk_keys)))
who_has = dict((k, meta.workers) for k, meta in metas.items())

candidate_workers = self._get_eps_by_worker_locality(input_chunk_keys, who_has, input_sizes)
locality_workers = set(candidate_workers)
Expand Down Expand Up @@ -319,31 +318,13 @@ def _allocate_resource(self, session_id, op_key, op_info, target_worker=None, re
logger.debug('Operand %s(%s) allocated to run in %s given collected statistics',
op_key, op_info['op_name'], worker_ep)

self._kv_store_ref.write('%s/worker' % op_path, worker_ep)
self.tell_promise(callback, worker_ep)
return worker_ep, rejects
rejects.append(worker_ep)
return None, rejects

def _get_who_has(self, session_id, chunk_key):
return [ch.key.rsplit('/', 1)[-1] for ch in self._kv_store_ref.read(
'/sessions/%s/chunks/%s/workers' % (session_id, chunk_key)).children]

def _get_multiple_who_has(self, session_id, chunk_keys):
keys = ['/sessions/%s/chunks/%s/workers' % (session_id, chunk_key) for chunk_key in chunk_keys]
for result in self._kv_store_ref.read_batch(keys):
yield [ch.key.rsplit('/', 1)[-1] for ch in result.children]

def _get_chunk_size(self, session_id, chunk_key):
return self._kv_store_ref.read(
'/sessions/%s/chunks/%s/data_size' % (session_id, chunk_key)).value

def _get_multiple_chunk_size(self, session_id, chunk_keys):
if not chunk_keys:
return tuple()
keys = ['/sessions/%s/chunks/%s/data_size' % (session_id, chunk_key)
for chunk_key in chunk_keys]
return (res.value for res in self._kv_store_ref.read_batch(keys))
def _get_chunks_meta(self, session_id, keys):
return dict(zip(keys, self._chunk_meta_ref.batch_get_chunk_meta(session_id, keys)))

def _get_op_metric_item(self, ep, op_name, item):
return self._get_metric_item(ep, 'calc_speed.' + op_name, item)
Expand Down Expand Up @@ -388,11 +369,11 @@ def _is_stats_sufficient(self, op_name):
self._sufficient_operands.add(op_name)
return True

def _get_eps_by_worker_locality(self, input_keys, who_has, input_sizes):
def _get_eps_by_worker_locality(self, input_keys, chunk_workers, input_sizes):
locality_data = defaultdict(lambda: 0)
for k in input_keys:
if k in who_has:
for ep in who_has[k]:
if k in chunk_workers:
for ep in chunk_workers[k]:
locality_data[ep] += input_sizes[k]
workers = list(self._worker_metrics.keys())
random.shuffle(workers)
Expand All @@ -406,7 +387,7 @@ def _get_eps_by_worker_locality(self, input_keys, who_has, input_sizes):
max_eps.append(ep)
return max_eps

def _get_ep_by_worker_stats(self, input_keys, who_has, input_sizes, output_size, op_name):
def _get_ep_by_worker_stats(self, input_keys, chunk_workers, input_sizes, output_size, op_name):
ep_net_speeds = dict()
if any(self._get_metric_item(ep, 'net_transfer_speed', 'count') <= options.optimize.min_stats_count
for ep in self._worker_metrics):
Expand All @@ -428,7 +409,7 @@ def _get_ep_by_worker_stats(self, input_keys, who_has, input_sizes, output_size,
ep_calc_time = defaultdict(lambda: 0)
locality_data = defaultdict(lambda: 0)
for key in input_keys:
contain_eps = who_has.get(key, set())
contain_eps = chunk_workers.get(key, set())
for ep in self._worker_metrics:
if ep not in contain_eps:
ep_transmit_times[ep].append(input_sizes[key] * 1.0 / ep_net_speeds[ep])
Expand Down

0 comments on commit 0c4d8aa

Please sign in to comment.