Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn serialization of executable graphs #1769

Merged
merged 2 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mars/dataframe/reduction/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,11 @@ def _execute_map(cls, ctx, op: "DataFrameAggregate"):

# map according to map groups
ret_map_dfs = dict()
in_cols_set = set(in_data.columns) if in_data.ndim == 2 else None
for input_key, output_key, cols, func in op.pre_funcs:
if cols and in_cols_set == set(cols):
cols = None

src_df = in_data if cols is None else in_data[cols]
if input_key == output_key:
ret_map_dfs[output_key] = src_df
Expand Down
10 changes: 9 additions & 1 deletion mars/dataframe/reduction/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,11 @@ def _interpret_var(v):

def compile(self) -> ReductionSteps:
pre_funcs, agg_funcs, post_funcs = [], [], []
referred_cols = set()
for key, step in self._output_key_to_pre_steps.items():
cols = self._output_key_to_pre_cols[key]
if cols:
referred_cols.update(cols)
pre_funcs.append(ReductionPreStep(
step.input_key, step.output_key, cols, step.func))

Expand All @@ -867,7 +870,12 @@ def compile(self) -> ReductionSteps:

for key, step in self._output_key_to_post_steps.items():
cols = self._output_key_to_post_cols[key]
if cols and set(cols) == set(referred_cols):
post_cols = None
else:
post_cols = cols

post_funcs.append(ReductionPostStep(
step.input_keys, step.output_key, step.func_name, cols, step.func))
step.input_keys, step.output_key, step.func_name, post_cols, step.func))

return ReductionSteps(pre_funcs, agg_funcs, post_funcs)
2 changes: 1 addition & 1 deletion mars/deploy/local/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ def wrapped(*arg, **kwargs):
expected4 = raw.groupby('c2').transform(func)
pd.testing.assert_frame_equal(r4.sort_index(), expected4.sort_index())

# test rerun gropuby
# test rerun groupby
df = md.DataFrame(raw.copy(), chunk_size=4)
r5 = session.run(df.groupby('c2').count(method='shuffle').max())
r6 = session.run(df.groupby('c2').count(method='shuffle').min())
Expand Down
19 changes: 18 additions & 1 deletion mars/scheduler/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,10 @@ def create_operand_actors(self, _clean_info=True, _start=True):
meta_op_infos = dict()
initial_keys = []
to_allocate_op_keys = set()

total_ops = len(self._op_key_to_chunk)
processed_ops = 0
last_progress = 0
for op_key in self._op_key_to_chunk:
chunks = self._op_key_to_chunk[op_key]
op = chunks[0].op
Expand All @@ -908,7 +912,8 @@ def create_operand_actors(self, _clean_info=True, _start=True):
io_meta = self._collect_operand_io_meta(chunk_graph, chunks)
op_info['op_name'] = meta_op_info['op_name'] = op_name
op_info['io_meta'] = io_meta
op_info['executable_dag'] = self.get_executable_operand_dag(op_key)
op_info['executable_dag'] = self._graph_analyze_pool.submit(
self.get_executable_operand_dag, op_key).result()
# todo change this when other calc devices supported
op_info['calc_device'] = 'cuda' if op.gpu else 'cpu'

Expand Down Expand Up @@ -946,17 +951,29 @@ def create_operand_actors(self, _clean_info=True, _start=True):
op_info.pop('executable_dag', None)
del op_info['io_meta']

processed_ops += 1
progress = processed_ops * 1.0 / total_ops
if int(progress * 20) > last_progress:
last_progress = int(progress * 20)
logger.info('Operand actor creation progress: %d / %d', processed_ops, total_ops)

self.state = GraphState.RUNNING
self._graph_meta_ref.update_op_infos(meta_op_infos, _tell=True, _wait=False)

if _start:
existing_keys = []
created_keys = []
for op_key, future in op_refs.items():
try:
op_refs[op_key] = future.result()
created_keys.append(op_key)
except ActorAlreadyExist:
existing_keys.append(op_key)

# start operands when all operands are created
for op_key in created_keys:
op_refs[op_key].start_operand(_tell=True, _wait=False)

append_futures = []
for op_key in existing_keys:
chunks = self._op_key_to_chunk[op_key]
Expand Down
2 changes: 0 additions & 2 deletions mars/scheduler/operands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ def post_create(self):
if self._with_kvstore:
self._kv_store_ref = self.ctx.actor_ref(KVStoreActor.default_uid())

self.ref().start_operand(_tell=True)

def pre_destroy(self):
self.unset_cluster_info_ref()

Expand Down