From ac94242ea993948e8e6bb54d961d36794c918864 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Thu, 22 Mar 2018 22:55:21 +0800 Subject: [PATCH 1/5] change boost download url to speed up download --- cmake/external/boost.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/external/boost.cmake b/cmake/external/boost.cmake index d9cd264b49d54..10662fc967046 100644 --- a/cmake/external/boost.cmake +++ b/cmake/external/boost.cmake @@ -24,7 +24,7 @@ set(BOOST_PROJECT "extern_boost") # So we use 1.41.0 here. set(BOOST_VER "1.41.0") set(BOOST_TAR "boost_1_41_0") -set(BOOST_URL "http://paddlepaddledeps.s3-website-us-west-1.amazonaws.com/${BOOST_TAR}.tar.gz") +set(BOOST_URL "http://paddlepaddledeps.bj.bcebos.com/${BOOST_TAR}.tar.gz") set(BOOST_SOURCES_DIR ${THIRD_PARTY_PATH}/boost) set(BOOST_DOWNLOAD_DIR "${BOOST_SOURCES_DIR}/src/${BOOST_PROJECT}") set(BOOST_INCLUDE_DIR "${BOOST_DOWNLOAD_DIR}/${BOOST_TAR}" CACHE PATH "boost include directory." FORCE) From 76ae540f8ef3dc5463da6127556fc48a343698c9 Mon Sep 17 00:00:00 2001 From: Varun Arora Date: Thu, 22 Mar 2018 10:44:43 -0700 Subject: [PATCH 2/5] Move Select to concurrency.py; incorporate outputs (#9136) * Move Select to concurrency.py; incorporate outputs * CLang formatting for concurrency * Remove extra bracket - formatting fix - 3 * Comment fix --- paddle/fluid/framework/concurrency_test.cc | 10 +- paddle/fluid/operators/select_op.cc | 5 + python/paddle/fluid/concurrency.py | 182 +++++++++++++++++++- python/paddle/fluid/layers/control_flow.py | 183 +-------------------- 4 files changed, 192 insertions(+), 188 deletions(-) diff --git a/paddle/fluid/framework/concurrency_test.cc b/paddle/fluid/framework/concurrency_test.cc index 25152054eb845..e98e9d94bf71f 100644 --- a/paddle/fluid/framework/concurrency_test.cc +++ b/paddle/fluid/framework/concurrency_test.cc @@ -150,8 +150,9 @@ void AddFibonacciSelect(Scope *scope, p::CPUPlace *place, ProgramDesc *program, // Select block AddOp("select", {{"X", {dataChanName, quitChanName}}, {"case_to_execute", {"caseToExecute"}}}, - {}, {{"sub_block", casesBlock}, - {"cases", std::vector{case0Config, case1Config}}}, + {{"Out", {}}}, + {{"sub_block", casesBlock}, + {"cases", std::vector{case0Config, case1Config}}}, whileBlock); scope->Var("stepScopes"); @@ -209,9 +210,8 @@ TEST(Concurrency, Go_Op) { executor.Run(program, &scope, 0, true, true); - // After we call executor.run, the Go operator should do a channel_send to set - // the - // "result" variable to 99 + // After we call executor.run, the Go operator should do a channel_send to + // set the "result" variable to 99. auto *finalData = tensor.data(); EXPECT_EQ(finalData[0], 99); } diff --git a/paddle/fluid/operators/select_op.cc b/paddle/fluid/operators/select_op.cc index 8344a239df7b3..c0bf0ff927481 100644 --- a/paddle/fluid/operators/select_op.cc +++ b/paddle/fluid/operators/select_op.cc @@ -27,6 +27,7 @@ namespace operators { static constexpr char kX[] = "X"; static constexpr char kCaseToExecute[] = "case_to_execute"; +static constexpr char kOutputs[] = "Out"; static constexpr char kCases[] = "cases"; static constexpr char kCasesBlock[] = "sub_block"; @@ -388,6 +389,10 @@ class SelectOpMaker : public framework::OpProtoAndCheckerMaker { "(Int) The variable the sets the index of the case to execute, " "after evaluating the channels being sent to and received from") .AsDuplicable(); + AddOutput(kOutputs, + "A set of variables, which will be assigned with values " + "generated by the operators inside the cases of Select Op.") + .AsDuplicable(); AddAttr>(kCases, "(String vector) Serialized list of" "all cases in the select op. Each" diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index 3e4292d23550b..d65e1a6858373 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from layers.control_flow import BlockGuard, Select +from layers.control_flow import BlockGuard, equal +from .framework import Operator from layer_helper import LayerHelper, unique_name from layers import fill_constant import core @@ -75,6 +76,185 @@ def construct_go_op(self): attrs={'sub_block': go_block}) +class SelectCase(object): + DEFAULT = 0 + SEND = 1 + RECEIVE = 2 + + def __init__(self, + case_idx, + case_to_execute, + channel_action_fn=None, + channel=None, + value=None): + self.helper = LayerHelper('conditional_block') + self.main_program = self.helper.main_program + self.is_scalar_condition = True + + self.case_to_execute = case_to_execute + self.idx = case_idx + + # Since we aren't going to use the `channel_send` or `channel_recv` + # functions directly, we just need to capture the name. + self.action = (self.SEND + if channel_action_fn.__name__ == ('channel_send') else + self.RECEIVE) if channel_action_fn else self.DEFAULT + self.value = value + self.channel = channel + + def __enter__(self): + self.block = self.main_program.create_block() + + def construct_op(self): + main_program = self.helper.main_program + cases_block = main_program.current_block() + + inner_outputs = set() + input_set = set() + params = set() + + for op in self.block.ops: + # Iterate over all operators, get all the inputs + # and add as input to the SelectCase operator. + for iname in op.input_names: + for in_var_name in op.input(iname): + if in_var_name not in inner_outputs: + input_set.add(in_var_name) + + for oname in op.output_names: + for out_var_name in op.output(oname): + inner_outputs.add(out_var_name) + + param_list = [ + cases_block.var(each_name) for each_name in params + if each_name not in input_set + ] + + # Iterate over all operators, get all the outputs + # add to the output list of SelectCase operator only if + # they exist in the parent block. + out_vars = [] + for inner_out_name in inner_outputs: + if inner_out_name in cases_block.vars: + out_vars.append(cases_block.var(inner_out_name)) + + # First, create an op that will determine whether or not this is the + # conditional variable to execute. + should_execute_block = equal( + fill_constant( + shape=[1], dtype=core.VarDesc.VarType.INT32, value=self.idx), + self.case_to_execute) + + step_scope = cases_block.create_var( + type=core.VarDesc.VarType.STEP_SCOPES) + + cases_block.append_op( + type='conditional_block', + inputs={'X': [should_execute_block], + 'Params': param_list}, + outputs={'Out': out_vars, + 'Scope': [step_scope]}, + attrs={ + 'sub_block': self.block, + 'is_scalar_condition': self.is_scalar_condition + }) + + return '%s,%s,%s,%s' % (self.idx, self.action, self.channel.name + if self.channel else '', self.value.name + if self.value else '') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.main_program.rollback() + if exc_type is not None: + return False # re-raise exception + return True + + +class Select(BlockGuard): + def __init__(self, name=None): + self.helper = LayerHelper('select', name=name) + self.cases = [] + + super(Select, self).__init__(self.helper.main_program) + self.case_to_execute = fill_constant( + shape=[1], dtype=core.VarDesc.VarType.INT32, value=-1) + + def __enter__(self): + super(Select, self).__enter__() + return self + + def case(self, channel_action_fn, channel, value): + """Create a new block for this condition. + """ + select_case = SelectCase( + len(self.cases), self.case_to_execute, channel_action_fn, channel, + value) + + self.cases.append(select_case) + + return select_case + + def default(self): + """Create a default case block for this condition. + """ + default_case = SelectCase(len(self.cases), self.case_to_execute) + + self.cases.append(default_case) + + return default_case + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return False + + # Create a select op and another block to wrap its + # case blocks. + select_block = self.helper.main_program.current_block() + parent_block = self.helper.main_program.block(select_block.parent_idx) + + # Construct each case op, inside the newly created select block. + serialized_cases = [] + for case in self.cases: + serialized_cases.append(case.construct_op()) + + intermediate = set() + params = set() + + for case_block in select_block.ops: + if case_block.attrs and 'sub_block' in case_block.attrs: + for each_op in case_block.attrs['sub_block'].ops: + assert isinstance(each_op, Operator) + for iname in each_op.input_names: + for in_var_name in each_op.input(iname): + if in_var_name not in intermediate: + params.add(in_var_name) + + for oname in each_op.output_names: + for out_var_name in each_op.output(oname): + intermediate.add(out_var_name) + + out_list = [ + parent_block.var(var_name) for var_name in parent_block.vars + if var_name in intermediate + ] + + X = [select_block.var_recursive(x_name) for x_name in params] + + # Needs to be used by `equal` inside the cases block. + X.append(self.case_to_execute) + + # Construct the select op. + parent_block.append_op( + type='select', + inputs={'X': X, + 'case_to_execute': self.case_to_execute}, + attrs={'sub_block': select_block, + 'cases': serialized_cases}, + outputs={'Out': out_list}) + + return super(Select, self).__exit__(exc_type, exc_val, exc_tb) + + def make_channel(dtype, capacity=0): """ Helps implementation of a concurrent program by creating a "channel" of diff --git a/python/paddle/fluid/layers/control_flow.py b/python/paddle/fluid/layers/control_flow.py index 02cd0a05a11d8..1bb1aa30ee101 100644 --- a/python/paddle/fluid/layers/control_flow.py +++ b/python/paddle/fluid/layers/control_flow.py @@ -16,7 +16,7 @@ from layer_function_generator import autodoc from tensor import assign, fill_constant from .. import core -from ..framework import Program, Variable, Operator, Block +from ..framework import Program, Variable, Operator from ..layer_helper import LayerHelper, unique_name from ops import logical_and, logical_not, logical_or @@ -29,7 +29,6 @@ 'WhileGuard', 'While', 'Switch', - 'Select', 'lod_rank_table', 'max_sequence_len', 'topk', @@ -1212,186 +1211,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): return True -class SelectCase(object): - DEFAULT = 0 - SEND = 1 - RECEIVE = 2 - - def __init__(self, - case_idx, - case_to_execute, - channel_action_fn=None, - channel=None, - value=None): - self.helper = LayerHelper('conditional_block') - self.main_program = self.helper.main_program - self.is_scalar_condition = True - - self.case_to_execute = case_to_execute - self.idx = case_idx - - # Since we aren't going to use the `channel_send` or `channel_recv` - # functions directly, we just need to capture the name. - self.action = (self.SEND - if channel_action_fn.__name__ == ('channel_send') else - self.RECEIVE) if channel_action_fn else (self.DEFAULT) - self.value = value - self.channel = channel - - def __enter__(self): - self.block = self.main_program.create_block() - - def construct_op(self): - main_program = self.helper.main_program - cases_block = main_program.current_block() - - inner_outputs = set() - input_set = set() - params = set() - - for op in self.block.ops: - # Iterate over all operators, get all the inputs - # and add as input to the SelectCase operator. - for iname in op.input_names: - for in_var_name in op.input(iname): - if in_var_name not in inner_outputs: - input_set.add(in_var_name) - - for oname in op.output_names: - for out_var_name in op.output(oname): - inner_outputs.add(out_var_name) - - param_list = [ - cases_block.var(each_name) for each_name in params - if each_name not in input_set - ] - - # Iterate over all operators, get all the outputs - # add to the output list of SelectCase operator only if - # they exist in the parent block. - out_vars = [] - for inner_out_name in inner_outputs: - if inner_out_name in cases_block.vars: - out_vars.append(cases_block.var(inner_out_name)) - - # First, create an op that will determine whether or not this is the - # conditional variable to execute. - should_execute_block = equal( - fill_constant( - shape=[1], dtype=core.VarDesc.VarType.INT32, value=self.idx), - self.case_to_execute) - - step_scope = cases_block.create_var( - type=core.VarDesc.VarType.STEP_SCOPES) - - cases_block.append_op( - type='conditional_block', - inputs={'X': [should_execute_block], - 'Params': param_list}, - outputs={'Out': out_vars, - 'Scope': [step_scope]}, - attrs={ - 'sub_block': self.block, - 'is_scalar_condition': self.is_scalar_condition - }) - - return '%s,%s,%s,%s' % (self.idx, self.action, self.channel.name - if self.channel else '', self.value.name - if self.value else '') - - def __exit__(self, exc_type, exc_val, exc_tb): - self.main_program.rollback() - if exc_type is not None: - return False # re-raise exception - return True - - -class Select(BlockGuard): - def __init__(self, name=None): - self.helper = LayerHelper('select', name=name) - self.cases = [] - - super(Select, self).__init__(self.helper.main_program) - self.case_to_execute = fill_constant( - shape=[1], dtype=core.VarDesc.VarType.INT32, value=-1) - - def __enter__(self): - super(Select, self).__enter__() - return self - - def case(self, channel_action_fn, channel, value): - """Create a new block for this condition. - """ - select_case = SelectCase( - len(self.cases), self.case_to_execute, channel_action_fn, channel, - value) - - self.cases.append(select_case) - - return select_case - - def default(self): - """Create a default case block for this condition. - """ - default_case = SelectCase(len(self.cases), self.case_to_execute) - - self.cases.append(default_case) - - return default_case - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - return False - - # Create a select op and another block to wrap its - # case blocks. - select_block = self.helper.main_program.current_block() - parent_block = self.helper.main_program.block(select_block.parent_idx) - - # Construct each case op, inside the newly created select block. - serialized_cases = [] - for case in self.cases: - serialized_cases.append(case.construct_op()) - - intermediate = set() - params = set() - - for case_block in select_block.ops: - if case_block.attrs and 'sub_block' in case_block.attrs: - for each_op in case_block.attrs['sub_block'].ops: - assert isinstance(each_op, Operator) - for iname in each_op.input_names: - for in_var_name in each_op.input(iname): - if in_var_name not in intermediate: - params.add(in_var_name) - - for oname in each_op.output_names: - for out_var_name in each_op.output(oname): - intermediate.add(out_var_name) - - # TODO(varunarora): Figure out if defining output is needed. - out_list = [ - parent_block.var(var_name) for var_name in parent_block.vars - if var_name in intermediate - ] - - X = [select_block.var_recursive(x_name) for x_name in params] - - # Needs to be used by `equal` inside the cases block. - X.append(self.case_to_execute) - - # Construct the select op. - parent_block.append_op( - type='select', - inputs={'X': X, - 'case_to_execute': self.case_to_execute}, - attrs={'sub_block': select_block, - 'cases': serialized_cases}, - outputs={}) - - return super(Select, self).__exit__(exc_type, exc_val, exc_tb) - - class IfElseBlockGuard(object): def __init__(self, is_true, ifelse): if not isinstance(ifelse, IfElse): From a9a228ad8dc30e2341e0e64b6cb053dc116578e6 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 23 Mar 2018 18:40:22 +0800 Subject: [PATCH 3/5] fix dist compile --- paddle/fluid/operators/detail/grpc_server.h | 2 ++ paddle/fluid/operators/detail/test_serde.cc | 10 ++++----- paddle/fluid/operators/listen_and_serv_op.cc | 22 +++++++------------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index f891c75dbc81a..787e1506e28aa 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -25,6 +25,8 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/grpc_service.h" #include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "paddle/fluid/operators/detail/simple_block_queue.h" +#include "paddle/fluid/operators/detail/sendrecvop_utils.h" namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 99c1577223c4f..e646c894d18d3 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -199,9 +199,9 @@ TEST(LodTensor, Run) { RunTestLodTensor(place); RunTestLodTensor(place, 1); #ifdef PADDLE_WITH_CUDA - platform::CUDAPlace place; - RunTestLodTensor(place); - RunTestLodTensor(place, 1); + platform::CUDAPlace gpu(0); + RunTestLodTensor(gpu); + RunTestLodTensor(gpu, 1); #endif } @@ -210,7 +210,7 @@ TEST(SelectedRows, Run) { RunSerdeTestSelectedRows(place); #ifdef PADDLE_WITH_CUDA - platform::CUDAPlace place; - RunSerdeTestSelectedRows(place); + platform::CUDAPlace gpu; + RunSerdeTestSelectedRows(gpu); #endif } diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index d8a3c45ac5bf6..9c788108e298d 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -93,12 +93,6 @@ class ListenAndServOp : public framework::OperatorBase { "server program should have at least 2 blocks"); framework::Executor executor(dev_place); - std::vector blk_ctx_list; - blk_ctx_list.push_back(nullptr); // block0 is not used. - for (int blkid = 1; blkid < num_blocks; ++blkid) { - auto *exe_ctx = executor.Prepare(*program, blkid); - blk_ctx_list.push_back(exe_ctx); - } // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; @@ -150,11 +144,11 @@ class ListenAndServOp : public framework::OperatorBase { // block0 contains only listen_and_serv op, start run from block1. for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { fs.push_back(framework::Async( - [&executor, &program, &recv_scope, &blk_ctx_list, blkid]() { + [&executor, &program, &recv_scope, blkid]() { int run_block = blkid; // thread local try { - executor.RunPreparedContext(blk_ctx_list[run_block], - &recv_scope, false, false); + executor.Run(*program, &recv_scope, run_block, + false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -164,8 +158,8 @@ class ListenAndServOp : public framework::OperatorBase { // Run global block at final step, or block1 if there are only 2 blocks if (num_blocks >= 2) { try { - executor.RunPreparedContext(blk_ctx_list[num_blocks - 1], &recv_scope, - false, false); + executor.Run(*program, &recv_scope, num_blocks - 1, + false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -185,9 +179,9 @@ class ListenAndServOp : public framework::OperatorBase { sparse_vars.clear(); } // while(true) - for (int i = 0; i < num_blocks; ++i) { - delete blk_ctx_list[i]; - } + // for (int i = 0; i < num_blocks; ++i) { + // delete blk_ctx_list[i]; + // } } protected: From bb815d4364eaaf6c4053fc6c2259ebfa559bca90 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 23 Mar 2018 19:13:25 +0800 Subject: [PATCH 4/5] update --- .clang_format.hook | 2 +- paddle/fluid/operators/detail/grpc_server.h | 3 +-- paddle/fluid/operators/listen_and_serv_op.cc | 10 ++++------ 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/.clang_format.hook b/.clang_format.hook index 1d928216867c0..edec286b7738c 100755 --- a/.clang_format.hook +++ b/.clang_format.hook @@ -1,7 +1,7 @@ #!/bin/bash set -e -readonly VERSION="3.8" +readonly VERSION="7.0" version=$(clang-format -version) diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 787e1506e28aa..10e6dd45a901d 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -22,11 +22,10 @@ limitations under the License. */ #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" #include "paddle/fluid/operators/detail/grpc_service.h" -#include "paddle/fluid/operators/detail/grpc_service.h" #include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h" -#include "paddle/fluid/operators/detail/simple_block_queue.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/detail/simple_block_queue.h" namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 9c788108e298d..08b83375dd546 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -143,12 +143,11 @@ class ListenAndServOp : public framework::OperatorBase { std::vector> fs; // block0 contains only listen_and_serv op, start run from block1. for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { - fs.push_back(framework::Async( - [&executor, &program, &recv_scope, blkid]() { + fs.push_back( + framework::Async([&executor, &program, &recv_scope, blkid]() { int run_block = blkid; // thread local try { - executor.Run(*program, &recv_scope, run_block, - false, false); + executor.Run(*program, &recv_scope, run_block, false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -158,8 +157,7 @@ class ListenAndServOp : public framework::OperatorBase { // Run global block at final step, or block1 if there are only 2 blocks if (num_blocks >= 2) { try { - executor.Run(*program, &recv_scope, num_blocks - 1, - false, false); + executor.Run(*program, &recv_scope, num_blocks - 1, false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } From bf66ce04940477375d8d605dcd8ece45ae2a4b61 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 23 Mar 2018 19:15:05 +0800 Subject: [PATCH 5/5] update --- .clang_format.hook | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.clang_format.hook b/.clang_format.hook index edec286b7738c..1d928216867c0 100755 --- a/.clang_format.hook +++ b/.clang_format.hook @@ -1,7 +1,7 @@ #!/bin/bash set -e -readonly VERSION="7.0" +readonly VERSION="3.8" version=$(clang-format -version)