Skip to content

Commit

Permalink
Assign enqueued operands immediately when no descendants are ready (#854
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wjsi authored and Xuye (Chris) Qin committed Dec 13, 2019
1 parent d853f9c commit 81f63cd
Show file tree
Hide file tree
Showing 25 changed files with 345 additions and 152 deletions.
3 changes: 2 additions & 1 deletion mars/_utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
cpdef str to_str(s, encoding=*)
cpdef bytes to_binary(s, encoding=*)
cpdef unicode to_text(s, encoding=*)
cpdef register(cls, handler)
cpdef register_tokenizer(cls, handler)
cpdef tuple insert_reversed_tuple(tuple a, object x)

29 changes: 27 additions & 2 deletions mars/_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,32 @@ tokenize_handler.register(pd.Index, tokenize_pandas_index)
tokenize_handler.register(pd.Series, tokenize_pandas_series)
tokenize_handler.register(pd.DataFrame, tokenize_pandas_dataframe)

cpdef register(cls, handler):
cpdef register_tokenizer(cls, handler):
tokenize_handler.register(cls, handler)

__all__ = ['to_str', 'to_binary', 'to_text', 'tokenize', 'tokenize_int', 'register']

cpdef tuple insert_reversed_tuple(tuple a, object x):
cdef int mid, lo = 0, hi = len(a), len_a = hi
cdef object el

if len_a == 0:
return x,

while lo < hi:
mid = (lo + hi) // 2
if a[mid] > x: lo = mid + 1
else: hi = mid

if lo == len_a:
return a + (x,)
el = a[lo]
if el == x:
return a
elif lo == 0 and el < x:
return (x,) + a
else:
return a[:lo] + (x,) + a[lo:]


__all__ = ['to_str', 'to_binary', 'to_text', 'tokenize', 'tokenize_int', 'register_tokenizer',
'insert_reversed_tuple']
12 changes: 7 additions & 5 deletions mars/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
from .compat import six


DEFAULT_CHUNK_SIZE = 1496
DEFAULT_CONNECT_RETRY_TIMES = 4
DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_READ_TIMEOUT = 120
_DEFAULT_REDIRECT_WARN = 'Option {source} has been replaced by {target} and might be removed in a future release.'


Expand Down Expand Up @@ -322,7 +318,7 @@ def validate(x):
default_options.register_option('scheduler.enable_chunk_relocation', False, validator=is_bool, serialize=True)
default_options.register_option('scheduler.check_interval', 1, validator=is_integer, serialize=True)
default_options.register_option('scheduler.default_cpu_usage', 1, validator=(is_integer, is_float), serialize=True)
default_options.register_option('scheduler.default_cuda_usage', 1, validator=(is_integer, is_float), serialize=True)
default_options.register_option('scheduler.default_cuda_usage', 0.5, validator=(is_integer, is_float), serialize=True)
default_options.register_option('scheduler.execution_timeout', 600, validator=is_integer, serialize=True)
default_options.register_option('scheduler.retry_num', 4, validator=is_integer, serialize=True)
default_options.register_option('scheduler.fetch_limit', 10 * 1024 ** 2, validator=is_integer, serialize=True)
Expand All @@ -334,6 +330,11 @@ def validate(x):
default_options.register_option('scheduler.status_timeout', 60, validator=is_numeric, serialize=True)
default_options.register_option('scheduler.worker_blacklist_time', 3600, validator=is_numeric, serialize=True)

# enqueue operands in a batch when creating OperandActors
default_options.register_option('scheduler.batch_enqueue_initials', True, validator=is_bool, serialize=True)
# invoke assigning when where there is no ready descendants
default_options.register_option('scheduler.aggressive_assign', False, validator=is_bool, serialize=True)

# Worker
default_options.register_option('worker.spill_directory', None, validator=(is_null, is_string, is_list))
default_options.register_option('worker.disk_compression', 'lz4', validator=is_string, serialize=True)
Expand All @@ -342,6 +343,7 @@ def validate(x):
default_options.register_option('worker.callback_preserve_time', 3600 * 24, validator=is_integer)
default_options.register_option('worker.event_preserve_time', 3600 * 24, validator=(is_integer, is_float))
default_options.register_option('worker.copy_block_size', 64 * 1024, validator=is_integer)
default_options.register_option('worker.cuda_thread_num', 2, validator=is_integer)
default_options.register_option('worker.transfer_block_size', 1 * 1024 ** 2, validator=is_integer)
default_options.register_option('worker.transfer_compression', 'lz4', validator=is_string, serialize=True)
default_options.register_option('worker.prepare_data_timeout', 600, validator=is_integer)
Expand Down
42 changes: 33 additions & 9 deletions mars/scheduler/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def __init__(self):
# only when it is out of date
self._worker_metric_time = 0

self._allocate_requests = []

def post_create(self):
logger.debug('Actor %s running in process %d', self.uid, os.getpid())

Expand All @@ -129,8 +131,14 @@ def post_create(self):
def pre_destroy(self):
self._actual_ref.destroy()

def allocate_top_resources(self):
self._actual_ref.allocate_top_resources(_tell=True)
def allocate_top_resources(self, max_allocates=None):
self._allocate_requests.append(max_allocates)
self._actual_ref.allocate_top_resources(fetch_requests=True, _tell=True, _wait=False)

def get_allocate_requests(self):
reqs = self._allocate_requests
self._allocate_requests = []
return reqs

def mark_metrics_expired(self):
logger.debug('Metrics cache marked as expired.')
Expand Down Expand Up @@ -166,19 +174,21 @@ def apply_for_resource(self, session_id, op_key, op_info, callback=None):
:param op_info: operand information, should be a dict
:param callback: promise callback, called when the resource is assigned
"""
self._allocate_requests.append(1)
self._refresh_worker_metrics()
self._enqueue_operand(session_id, op_key, op_info, callback)
logger.debug('Operand %s enqueued', op_key)
self._actual_ref.allocate_top_resources(_tell=True)
self._actual_ref.allocate_top_resources(fetch_requests=True, _tell=True, _wait=False)

@log_unhandled
def apply_for_multiple_resources(self, session_id, applications):
self._allocate_requests.append(len(applications))
self._refresh_worker_metrics()
logger.debug('%d operands applied for session %s', len(applications), session_id)
for app in applications:
op_key, op_info = app
self._enqueue_operand(session_id, op_key, op_info)
self._actual_ref.allocate_top_resources(_tell=True)
self._actual_ref.allocate_top_resources(fetch_requests=True, _tell=True)

@log_unhandled
def update_priority(self, op_key, priority_data):
Expand Down Expand Up @@ -264,7 +274,7 @@ def periodical_allocate(self):
self.allocate_top_resources()
self.ref().periodical_allocate(_tell=True, _delay=0.5)

def allocate_top_resources(self):
def allocate_top_resources(self, fetch_requests=False):
"""
Allocate resources given the order in AssignerActor
"""
Expand All @@ -276,10 +286,20 @@ def allocate_top_resources(self):
if not self._worker_metrics:
return

if fetch_requests:
requests = self._assigner_ref.get_allocate_requests()
if not requests:
return
max_allocates = sys.maxsize if any(v is None for v in requests) else sum(requests)
else:
max_allocates = sys.maxsize

unassigned = []
reject_workers = set()
# the assigning procedure will continue till
while len(reject_workers) < len(self._worker_metrics):
assigned = 0
# the assigning procedure will continue till all workers rejected
# or max_allocates reached
while len(reject_workers) < len(self._worker_metrics) and assigned < max_allocates:
item = self._assigner_ref.pop_head()
if not item:
break
Expand All @@ -298,13 +318,17 @@ def allocate_top_resources(self):
reject_workers.update(rejects)
if alloc_ep:
# assign successfully, we remove the application
self._assigner_ref.remove_apply(item.op_key)
self._assigner_ref.remove_apply(item.op_key, _tell=True)
assigned += 1
else:
# put the unassigned item into unassigned list to add back to the queue later
unassigned.append(item)
if unassigned:
# put unassigned back to the queue, if any
self._assigner_ref.extend(unassigned)
self._assigner_ref.extend(unassigned, _tell=True)

if not fetch_requests:
self._assigner_ref.get_allocate_requests(_tell=True, _wait=False)

@log_unhandled
def _allocate_resource(self, session_id, op_key, op_info, target_worker=None, reject_workers=None):
Expand Down
4 changes: 2 additions & 2 deletions mars/scheduler/chunkmeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ class ChunkMetaClient(object):
"""
Actor dispatches chunk meta requests to different scheduler hosts
"""
def __init__(self, ctx, cluster_info_ref):
def __init__(self, ctx, cluster_info_ref, has_local_cache=True):
self._cluster_info = cluster_info_ref
self.ctx = ctx
self._local_meta_store_ref = ctx.actor_ref(
ChunkMetaActor.default_uid(), address=cluster_info_ref.address)
if not ctx.has_actor(self._local_meta_store_ref):
if not has_local_cache or not ctx.has_actor(self._local_meta_store_ref):
self._local_meta_store_ref = None

def get_scheduler(self, key):
Expand Down
3 changes: 2 additions & 1 deletion mars/scheduler/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ def create_operand_actors(self, _clean_info=True, _start=True):
scheduler_addr = self.get_scheduler(op_uid)
kw = {}
# for the **real** initial chunks, we do batch submitting
if chunk_graph.count_predecessors(chunks[0]) == 0:
if options.scheduler.batch_enqueue_initials \
and chunk_graph.count_predecessors(chunks[0]) == 0:
kw['allocated'] = True
to_allocate_op_keys.add(op_key)

Expand Down
13 changes: 8 additions & 5 deletions mars/scheduler/operands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def __init__(self, session_id, graph_id, op_key, op_info, worker=None,

self._executable_dag = op_info.pop('executable_dag', None)

# set of running predecessors, used to broadcast priority changes
self._running_preds = set()
# set of finished predecessors, used to decide whether we should move the operand to ready
self._finish_preds = set()
# set of finished successors, used to detect whether we can do clean up
Expand Down Expand Up @@ -106,13 +108,11 @@ def state(self, value):
self._last_state, value)
self._state = value
self._info['state'] = value.name
futures = []
for graph_ref in self._graph_refs:
futures.append(graph_ref.set_operand_state(self._op_key, value, _tell=True, _wait=False))
graph_ref.set_operand_state(self._op_key, value, _tell=True, _wait=False)
if self._kv_store_ref is not None:
futures.append(self._kv_store_ref.write(
'%s/state' % self._op_path, value.name, _tell=True, _wait=False))
[f.result() for f in futures]
self._kv_store_ref.write(
'%s/state' % self._op_path, value.name, _tell=True, _wait=False)

@property
def worker(self):
Expand Down Expand Up @@ -213,6 +213,9 @@ def stop_operand(self, state=OperandState.CANCELLING):
if self.state != state:
self.start_operand(state)

def add_running_predecessor(self, op_key, worker):
self._running_preds.add(op_key)

def add_finished_predecessor(self, op_key, worker, output_sizes=None):
self._finish_preds.add(op_key)

Expand Down
74 changes: 43 additions & 31 deletions mars/scheduler/operands/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ...config import options
from ...errors import ExecutionInterrupted, DependencyMissing, WorkerDead
from ...operands import Operand
from ...utils import log_unhandled
from ...utils import log_unhandled, insert_reversed_tuple
from ..utils import GraphState, array_to_bytes
from .base import BaseOperandActor
from .core import OperandState, register_operand_class, rewrite_worker_errors
Expand Down Expand Up @@ -104,15 +104,22 @@ def start_operand(self, state=None, **kwargs):
self._target_worker = target_worker
return super(OperandActor, self).start_operand(state=state, **kwargs)

def add_running_predecessor(self, op_key, worker):
super(OperandActor, self).add_running_predecessor(op_key, worker)
self.update_demand_depths(self._info.get('optimize', {}).get('depth', 0))

def add_finished_predecessor(self, op_key, worker, output_sizes=None):
"""
This function shall return whether current node is ready. The return values will
be collected by the predecessor to judge if a node with lower-priority can be
scheduled.
"""
super(OperandActor, self).add_finished_predecessor(op_key, worker, output_sizes=output_sizes)
if all(k in self._finish_preds for k in self._pred_keys):
if self.state != OperandState.UNSCHEDULED:
return True
# all predecessors done, the operand can be executed now
self.start_operand(OperandState.READY)
if self.state == OperandState.UNSCHEDULED:
self.start_operand(OperandState.READY)
return True
self.ref().update_demand_depths(self._info.get('optimize', {}).get('depth', 0), _tell=True)
return False

def add_finished_successor(self, op_key, worker):
Expand All @@ -133,37 +140,29 @@ def update_demand_depths(self, depth):
produced by the current operand
:param depth: depth to update
"""
demand_depths = list(self._info.get('optimize', {}).get('demand_depths', ()))
if not demand_depths:
demand_depths = [depth]
else:
idx = 0
for idx, v in enumerate(demand_depths):
if v <= depth:
break
if demand_depths[idx] == depth:
return
elif demand_depths[idx] > depth:
demand_depths.append(depth)
else:
demand_depths.insert(idx, depth)
try:
optimize_data = self._info['optimize']
except KeyError:
optimize_data = self._info['optimize'] = dict()
optimize_data['demand_depths'] = tuple(demand_depths)

demand_depths = optimize_data.get('demand_depths', ())
new_demand_depths = insert_reversed_tuple(demand_depths, depth)
if demand_depths == new_demand_depths:
return
optimize_data['demand_depths'] = new_demand_depths
if self._kv_store_ref is not None:
self._kv_store_ref.write(
'%s/optimize/demand_depths' % self._op_path,
base64.b64encode(array_to_bytes('I', demand_depths)), _tell=True, _wait=False)
base64.b64encode(array_to_bytes('I', new_demand_depths)), _tell=True, _wait=False)

if self.state == OperandState.READY:
# if the operand is already submitted to AssignerActor, we need to update the priority
self._assigner_ref.update_priority(self._op_key, optimize_data, _tell=True, _wait=False)
else:
# send update command to predecessors
for in_key in self._pred_keys:
self._get_operand_actor(in_key).update_demand_depths(depth, _tell=True, _wait=False)
if in_key not in self._finish_preds and in_key not in self._running_preds:
self._get_operand_actor(in_key).update_demand_depths(depth, _tell=True, _wait=False)

def propose_descendant_workers(self, input_key, worker_scores, depth=1):
"""
Expand Down Expand Up @@ -388,12 +387,18 @@ def _apply_fail(*exc_info):
def _on_running(self):
self._execution_ref = self._get_execution_ref()

# notify successors to propagate priority changes
for out_key in self._succ_keys:
self._get_operand_actor(out_key).add_running_predecessor(
self._op_key, self.worker, _tell=True, _wait=False)

@log_unhandled
def _acceptor(data_sizes):
self._allocated = False
if not self._is_worker_alive():
return
self._resource_ref.deallocate_resource(self._session_id, self._op_key, self.worker, _tell=True)
self._resource_ref.deallocate_resource(
self._session_id, self._op_key, self.worker, _tell=True, _wait=False)

self._data_sizes = data_sizes
self._io_meta['data_targets'] = list(data_sizes)
Expand All @@ -404,7 +409,8 @@ def _rejecter(*exc):
self._allocated = False
# handling exception occurrence of operand execution
exc_type = exc[0]
self._resource_ref.deallocate_resource(self._session_id, self._op_key, self.worker, _tell=True)
self._resource_ref.deallocate_resource(
self._session_id, self._op_key, self.worker, _tell=True, _wait=False)

if self.state == OperandState.CANCELLING:
logger.warning('Execution of operand %s cancelled.', self._op_key)
Expand Down Expand Up @@ -451,21 +457,27 @@ def _on_finished(self):
self.start_operand(OperandState.CANCELLING)
return

futures = []
use_aggressive_assign = options.scheduler.aggressive_assign

succ_futures = []
# update pred & succ finish records to trigger further actions
# record if successors can be executed
for out_key in self._succ_keys:
futures.append(self._get_operand_actor(out_key).add_finished_predecessor(
self._op_key, self.worker, output_sizes=self._data_sizes,
_tell=True, _wait=False))
succ_futures.append(self._get_operand_actor(out_key).add_finished_predecessor(
self._op_key, self.worker, output_sizes=self._data_sizes, _wait=False))

pred_futures = []
for in_key in self._pred_keys:
futures.append(self._get_operand_actor(in_key).add_finished_successor(
pred_futures.append(self._get_operand_actor(in_key).add_finished_successor(
self._op_key, self.worker, _tell=True, _wait=False))
# require more chunks to execute if the completion caused no successors to run
if self._is_terminal:
# update records in GraphActor to help decide if the whole graph finished execution
futures.extend(self._add_finished_terminal())
[f.result() for f in futures]
pred_futures.extend(self._add_finished_terminal())
[f.result() for f in pred_futures]

if use_aggressive_assign and not any(f.result() for f in succ_futures):
self._assigner_ref.allocate_top_resources(1, _tell=True)

@log_unhandled
def _on_fatal(self):
Expand Down

0 comments on commit 81f63cd

Please sign in to comment.