Skip to content

Commit

Permalink
Spawn serialization of executable graphs (mars-project#1769)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Dec 10, 2020
1 parent 09593ed commit b1a5a97
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 5 deletions.
4 changes: 4 additions & 0 deletions mars/dataframe/reduction/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,11 @@ def _execute_map(cls, ctx, op: "DataFrameAggregate"):

# map according to map groups
ret_map_dfs = dict()
in_cols_set = set(in_data.columns) if in_data.ndim == 2 else None
for input_key, output_key, cols, func in op.pre_funcs:
if cols and in_cols_set == set(cols):
cols = None

src_df = in_data if cols is None else in_data[cols]
if input_key == output_key:
ret_map_dfs[output_key] = src_df
Expand Down
10 changes: 9 additions & 1 deletion mars/dataframe/reduction/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,11 @@ def _interpret_var(v):

def compile(self) -> ReductionSteps:
pre_funcs, agg_funcs, post_funcs = [], [], []
referred_cols = set()
for key, step in self._output_key_to_pre_steps.items():
cols = self._output_key_to_pre_cols[key]
if cols:
referred_cols.update(cols)
pre_funcs.append(ReductionPreStep(
step.input_key, step.output_key, cols, step.func))

Expand All @@ -867,7 +870,12 @@ def compile(self) -> ReductionSteps:

for key, step in self._output_key_to_post_steps.items():
cols = self._output_key_to_post_cols[key]
if cols and set(cols) == set(referred_cols):
post_cols = None
else:
post_cols = cols

post_funcs.append(ReductionPostStep(
step.input_keys, step.output_key, step.func_name, cols, step.func))
step.input_keys, step.output_key, step.func_name, post_cols, step.func))

return ReductionSteps(pre_funcs, agg_funcs, post_funcs)
2 changes: 1 addition & 1 deletion mars/deploy/local/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ def wrapped(*arg, **kwargs):
expected4 = raw.groupby('c2').transform(func)
pd.testing.assert_frame_equal(r4.sort_index(), expected4.sort_index())

# test rerun gropuby
# test rerun groupby
df = md.DataFrame(raw.copy(), chunk_size=4)
r5 = session.run(df.groupby('c2').count(method='shuffle').max())
r6 = session.run(df.groupby('c2').count(method='shuffle').min())
Expand Down
19 changes: 18 additions & 1 deletion mars/scheduler/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,10 @@ def create_operand_actors(self, _clean_info=True, _start=True):
meta_op_infos = dict()
initial_keys = []
to_allocate_op_keys = set()

total_ops = len(self._op_key_to_chunk)
processed_ops = 0
last_progress = 0
for op_key in self._op_key_to_chunk:
chunks = self._op_key_to_chunk[op_key]
op = chunks[0].op
Expand All @@ -908,7 +912,8 @@ def create_operand_actors(self, _clean_info=True, _start=True):
io_meta = self._collect_operand_io_meta(chunk_graph, chunks)
op_info['op_name'] = meta_op_info['op_name'] = op_name
op_info['io_meta'] = io_meta
op_info['executable_dag'] = self.get_executable_operand_dag(op_key)
op_info['executable_dag'] = self._graph_analyze_pool.submit(
self.get_executable_operand_dag, op_key).result()
# todo change this when other calc devices supported
op_info['calc_device'] = 'cuda' if op.gpu else 'cpu'

Expand Down Expand Up @@ -946,17 +951,29 @@ def create_operand_actors(self, _clean_info=True, _start=True):
op_info.pop('executable_dag', None)
del op_info['io_meta']

processed_ops += 1
progress = processed_ops * 1.0 / total_ops
if int(progress * 20) > last_progress:
last_progress = int(progress * 20)
logger.info('Operand actor creation progress: %d / %d', processed_ops, total_ops)

self.state = GraphState.RUNNING
self._graph_meta_ref.update_op_infos(meta_op_infos, _tell=True, _wait=False)

if _start:
existing_keys = []
created_keys = []
for op_key, future in op_refs.items():
try:
op_refs[op_key] = future.result()
created_keys.append(op_key)
except ActorAlreadyExist:
existing_keys.append(op_key)

# start operands when all operands are created
for op_key in created_keys:
op_refs[op_key].start_operand(_tell=True, _wait=False)

append_futures = []
for op_key in existing_keys:
chunks = self._op_key_to_chunk[op_key]
Expand Down
2 changes: 0 additions & 2 deletions mars/scheduler/operands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ def post_create(self):
if self._with_kvstore:
self._kv_store_ref = self.ctx.actor_ref(KVStoreActor.default_uid())

self.ref().start_operand(_tell=True)

def pre_destroy(self):
self.unset_cluster_info_ref()

Expand Down

0 comments on commit b1a5a97

Please sign in to comment.