From b1a5a9743f82a8acc04f6ab63abb67903da23ff5 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Wed, 9 Dec 2020 23:41:28 +0800 Subject: [PATCH] Spawn serialization of executable graphs (#1769) --- mars/dataframe/reduction/aggregation.py | 4 ++++ mars/dataframe/reduction/core.py | 10 +++++++++- mars/deploy/local/tests/test_cluster.py | 2 +- mars/scheduler/graph.py | 19 ++++++++++++++++++- mars/scheduler/operands/base.py | 2 -- 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/mars/dataframe/reduction/aggregation.py b/mars/dataframe/reduction/aggregation.py index 8fe0b64f3b..3660188d60 100644 --- a/mars/dataframe/reduction/aggregation.py +++ b/mars/dataframe/reduction/aggregation.py @@ -605,7 +605,11 @@ def _execute_map(cls, ctx, op: "DataFrameAggregate"): # map according to map groups ret_map_dfs = dict() + in_cols_set = set(in_data.columns) if in_data.ndim == 2 else None for input_key, output_key, cols, func in op.pre_funcs: + if cols and in_cols_set == set(cols): + cols = None + src_df = in_data if cols is None else in_data[cols] if input_key == output_key: ret_map_dfs[output_key] = src_df diff --git a/mars/dataframe/reduction/core.py b/mars/dataframe/reduction/core.py index a42ae33da9..41ca7d06fd 100644 --- a/mars/dataframe/reduction/core.py +++ b/mars/dataframe/reduction/core.py @@ -857,8 +857,11 @@ def _interpret_var(v): def compile(self) -> ReductionSteps: pre_funcs, agg_funcs, post_funcs = [], [], [] + referred_cols = set() for key, step in self._output_key_to_pre_steps.items(): cols = self._output_key_to_pre_cols[key] + if cols: + referred_cols.update(cols) pre_funcs.append(ReductionPreStep( step.input_key, step.output_key, cols, step.func)) @@ -867,7 +870,12 @@ def compile(self) -> ReductionSteps: for key, step in self._output_key_to_post_steps.items(): cols = self._output_key_to_post_cols[key] + if cols and set(cols) == set(referred_cols): + post_cols = None + else: + post_cols = cols + post_funcs.append(ReductionPostStep( - step.input_keys, step.output_key, step.func_name, cols, step.func)) + step.input_keys, step.output_key, step.func_name, post_cols, step.func)) return ReductionSteps(pre_funcs, agg_funcs, post_funcs) diff --git a/mars/deploy/local/tests/test_cluster.py b/mars/deploy/local/tests/test_cluster.py index 102071070a..d26d0454bd 100644 --- a/mars/deploy/local/tests/test_cluster.py +++ b/mars/deploy/local/tests/test_cluster.py @@ -603,7 +603,7 @@ def wrapped(*arg, **kwargs): expected4 = raw.groupby('c2').transform(func) pd.testing.assert_frame_equal(r4.sort_index(), expected4.sort_index()) - # test rerun gropuby + # test rerun groupby df = md.DataFrame(raw.copy(), chunk_size=4) r5 = session.run(df.groupby('c2').count(method='shuffle').max()) r6 = session.run(df.groupby('c2').count(method='shuffle').min()) diff --git a/mars/scheduler/graph.py b/mars/scheduler/graph.py index 848bc986f5..14deabffa1 100644 --- a/mars/scheduler/graph.py +++ b/mars/scheduler/graph.py @@ -893,6 +893,10 @@ def create_operand_actors(self, _clean_info=True, _start=True): meta_op_infos = dict() initial_keys = [] to_allocate_op_keys = set() + + total_ops = len(self._op_key_to_chunk) + processed_ops = 0 + last_progress = 0 for op_key in self._op_key_to_chunk: chunks = self._op_key_to_chunk[op_key] op = chunks[0].op @@ -908,7 +912,8 @@ def create_operand_actors(self, _clean_info=True, _start=True): io_meta = self._collect_operand_io_meta(chunk_graph, chunks) op_info['op_name'] = meta_op_info['op_name'] = op_name op_info['io_meta'] = io_meta - op_info['executable_dag'] = self.get_executable_operand_dag(op_key) + op_info['executable_dag'] = self._graph_analyze_pool.submit( + self.get_executable_operand_dag, op_key).result() # todo change this when other calc devices supported op_info['calc_device'] = 'cuda' if op.gpu else 'cpu' @@ -946,17 +951,29 @@ def create_operand_actors(self, _clean_info=True, _start=True): op_info.pop('executable_dag', None) del op_info['io_meta'] + processed_ops += 1 + progress = processed_ops * 1.0 / total_ops + if int(progress * 20) > last_progress: + last_progress = int(progress * 20) + logger.info('Operand actor creation progress: %d / %d', processed_ops, total_ops) + self.state = GraphState.RUNNING self._graph_meta_ref.update_op_infos(meta_op_infos, _tell=True, _wait=False) if _start: existing_keys = [] + created_keys = [] for op_key, future in op_refs.items(): try: op_refs[op_key] = future.result() + created_keys.append(op_key) except ActorAlreadyExist: existing_keys.append(op_key) + # start operands when all operands are created + for op_key in created_keys: + op_refs[op_key].start_operand(_tell=True, _wait=False) + append_futures = [] for op_key in existing_keys: chunks = self._op_key_to_chunk[op_key] diff --git a/mars/scheduler/operands/base.py b/mars/scheduler/operands/base.py index e82d813e88..c96f914e39 100644 --- a/mars/scheduler/operands/base.py +++ b/mars/scheduler/operands/base.py @@ -94,8 +94,6 @@ def post_create(self): if self._with_kvstore: self._kv_store_ref = self.ctx.actor_ref(KVStoreActor.default_uid()) - self.ref().start_operand(_tell=True) - def pre_destroy(self): self.unset_cluster_info_ref()