Skip to content

Commit

Permalink
Merge commit 'c83dd9b4c2e4685319773d4bf6c2164c498cd1dc' into develop
Browse files Browse the repository at this point in the history
* commit 'c83dd9b4c2e4685319773d4bf6c2164c498cd1dc':
  update
  update
  fix dist compile
  Move Select to concurrency.py; incorporate outputs (PaddlePaddle#9136)
  change boost download url to speed up download
  • Loading branch information
mikeseven committed Mar 23, 2018
2 parents 8766a00 + c83dd9b commit 42a0798
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 210 deletions.
2 changes: 1 addition & 1 deletion cmake/external/boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ set(BOOST_PROJECT "extern_boost")
# So we use 1.41.0 here.
set(BOOST_VER "1.41.0")
set(BOOST_TAR "boost_1_41_0")
set(BOOST_URL "http://paddlepaddledeps.s3-website-us-west-1.amazonaws.com/${BOOST_TAR}.tar.gz")
set(BOOST_URL "http://paddlepaddledeps.bj.bcebos.com/${BOOST_TAR}.tar.gz")
set(BOOST_SOURCES_DIR ${THIRD_PARTY_PATH}/boost)
set(BOOST_DOWNLOAD_DIR "${BOOST_SOURCES_DIR}/src/${BOOST_PROJECT}")
set(BOOST_INCLUDE_DIR "${BOOST_DOWNLOAD_DIR}/${BOOST_TAR}" CACHE PATH "boost include directory." FORCE)
Expand Down
10 changes: 5 additions & 5 deletions paddle/fluid/framework/concurrency_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ void AddFibonacciSelect(Scope *scope, p::CPUPlace *place, ProgramDesc *program,
// Select block
AddOp("select", {{"X", {dataChanName, quitChanName}},
{"case_to_execute", {"caseToExecute"}}},
{}, {{"sub_block", casesBlock},
{"cases", std::vector<std::string>{case0Config, case1Config}}},
{{"Out", {}}},
{{"sub_block", casesBlock},
{"cases", std::vector<std::string>{case0Config, case1Config}}},
whileBlock);

scope->Var("stepScopes");
Expand Down Expand Up @@ -209,9 +210,8 @@ TEST(Concurrency, Go_Op) {

executor.Run(program, &scope, 0, true, true);

// After we call executor.run, the Go operator should do a channel_send to set
// the
// "result" variable to 99
// After we call executor.run, the Go operator should do a channel_send to
// set the "result" variable to 99.
auto *finalData = tensor.data<int>();
EXPECT_EQ(finalData[0], 99);
}
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/operators/detail/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ limitations under the License. */
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/detail/grpc_service.h"
#include "paddle/fluid/operators/detail/grpc_service.h"
#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
#include "paddle/fluid/operators/detail/simple_block_queue.h"

namespace paddle {
namespace operators {
Expand Down
10 changes: 5 additions & 5 deletions paddle/fluid/operators/detail/test_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ TEST(LodTensor, Run) {
RunTestLodTensor(place);
RunTestLodTensor(place, 1);
#ifdef PADDLE_WITH_CUDA
platform::CUDAPlace place;
RunTestLodTensor(place);
RunTestLodTensor(place, 1);
platform::CUDAPlace gpu(0);
RunTestLodTensor(gpu);
RunTestLodTensor(gpu, 1);
#endif
}

Expand All @@ -210,7 +210,7 @@ TEST(SelectedRows, Run) {
RunSerdeTestSelectedRows(place);

#ifdef PADDLE_WITH_CUDA
platform::CUDAPlace place;
RunSerdeTestSelectedRows(place);
platform::CUDAPlace gpu;
RunSerdeTestSelectedRows(gpu);
#endif
}
22 changes: 7 additions & 15 deletions paddle/fluid/operators/listen_and_serv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ class ListenAndServOp : public framework::OperatorBase {
"server program should have at least 2 blocks");

framework::Executor executor(dev_place);
std::vector<framework::ExecutorPrepareContext *> blk_ctx_list;
blk_ctx_list.push_back(nullptr); // block0 is not used.
for (int blkid = 1; blkid < num_blocks; ++blkid) {
auto *exe_ctx = executor.Prepare(*program, blkid);
blk_ctx_list.push_back(exe_ctx);
}

// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
Expand Down Expand Up @@ -149,12 +143,11 @@ class ListenAndServOp : public framework::OperatorBase {
std::vector<std::future<void>> fs;
// block0 contains only listen_and_serv op, start run from block1.
for (int blkid = 1; blkid < num_blocks - 1; ++blkid) {
fs.push_back(framework::Async(
[&executor, &program, &recv_scope, &blk_ctx_list, blkid]() {
fs.push_back(
framework::Async([&executor, &program, &recv_scope, blkid]() {
int run_block = blkid; // thread local
try {
executor.RunPreparedContext(blk_ctx_list[run_block],
&recv_scope, false, false);
executor.Run(*program, &recv_scope, run_block, false, false);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
Expand All @@ -164,8 +157,7 @@ class ListenAndServOp : public framework::OperatorBase {
// Run global block at final step, or block1 if there are only 2 blocks
if (num_blocks >= 2) {
try {
executor.RunPreparedContext(blk_ctx_list[num_blocks - 1], &recv_scope,
false, false);
executor.Run(*program, &recv_scope, num_blocks - 1, false, false);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
Expand All @@ -185,9 +177,9 @@ class ListenAndServOp : public framework::OperatorBase {
sparse_vars.clear();
} // while(true)

for (int i = 0; i < num_blocks; ++i) {
delete blk_ctx_list[i];
}
// for (int i = 0; i < num_blocks; ++i) {
// delete blk_ctx_list[i];
// }
}

protected:
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/operators/select_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace operators {

static constexpr char kX[] = "X";
static constexpr char kCaseToExecute[] = "case_to_execute";
static constexpr char kOutputs[] = "Out";

static constexpr char kCases[] = "cases";
static constexpr char kCasesBlock[] = "sub_block";
Expand Down Expand Up @@ -388,6 +389,10 @@ class SelectOpMaker : public framework::OpProtoAndCheckerMaker {
"(Int) The variable the sets the index of the case to execute, "
"after evaluating the channels being sent to and received from")
.AsDuplicable();
AddOutput(kOutputs,
"A set of variables, which will be assigned with values "
"generated by the operators inside the cases of Select Op.")
.AsDuplicable();
AddAttr<std::vector<std::string>>(kCases,
"(String vector) Serialized list of"
"all cases in the select op. Each"
Expand Down
182 changes: 181 additions & 1 deletion python/paddle/fluid/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from layers.control_flow import BlockGuard, Select
from layers.control_flow import BlockGuard, equal
from .framework import Operator
from layer_helper import LayerHelper, unique_name
from layers import fill_constant
import core
Expand Down Expand Up @@ -75,6 +76,185 @@ def construct_go_op(self):
attrs={'sub_block': go_block})


class SelectCase(object):
DEFAULT = 0
SEND = 1
RECEIVE = 2

def __init__(self,
case_idx,
case_to_execute,
channel_action_fn=None,
channel=None,
value=None):
self.helper = LayerHelper('conditional_block')
self.main_program = self.helper.main_program
self.is_scalar_condition = True

self.case_to_execute = case_to_execute
self.idx = case_idx

# Since we aren't going to use the `channel_send` or `channel_recv`
# functions directly, we just need to capture the name.
self.action = (self.SEND
if channel_action_fn.__name__ == ('channel_send') else
self.RECEIVE) if channel_action_fn else self.DEFAULT
self.value = value
self.channel = channel

def __enter__(self):
self.block = self.main_program.create_block()

def construct_op(self):
main_program = self.helper.main_program
cases_block = main_program.current_block()

inner_outputs = set()
input_set = set()
params = set()

for op in self.block.ops:
# Iterate over all operators, get all the inputs
# and add as input to the SelectCase operator.
for iname in op.input_names:
for in_var_name in op.input(iname):
if in_var_name not in inner_outputs:
input_set.add(in_var_name)

for oname in op.output_names:
for out_var_name in op.output(oname):
inner_outputs.add(out_var_name)

param_list = [
cases_block.var(each_name) for each_name in params
if each_name not in input_set
]

# Iterate over all operators, get all the outputs
# add to the output list of SelectCase operator only if
# they exist in the parent block.
out_vars = []
for inner_out_name in inner_outputs:
if inner_out_name in cases_block.vars:
out_vars.append(cases_block.var(inner_out_name))

# First, create an op that will determine whether or not this is the
# conditional variable to execute.
should_execute_block = equal(
fill_constant(
shape=[1], dtype=core.VarDesc.VarType.INT32, value=self.idx),
self.case_to_execute)

step_scope = cases_block.create_var(
type=core.VarDesc.VarType.STEP_SCOPES)

cases_block.append_op(
type='conditional_block',
inputs={'X': [should_execute_block],
'Params': param_list},
outputs={'Out': out_vars,
'Scope': [step_scope]},
attrs={
'sub_block': self.block,
'is_scalar_condition': self.is_scalar_condition
})

return '%s,%s,%s,%s' % (self.idx, self.action, self.channel.name
if self.channel else '', self.value.name
if self.value else '')

def __exit__(self, exc_type, exc_val, exc_tb):
self.main_program.rollback()
if exc_type is not None:
return False # re-raise exception
return True


class Select(BlockGuard):
def __init__(self, name=None):
self.helper = LayerHelper('select', name=name)
self.cases = []

super(Select, self).__init__(self.helper.main_program)
self.case_to_execute = fill_constant(
shape=[1], dtype=core.VarDesc.VarType.INT32, value=-1)

def __enter__(self):
super(Select, self).__enter__()
return self

def case(self, channel_action_fn, channel, value):
"""Create a new block for this condition.
"""
select_case = SelectCase(
len(self.cases), self.case_to_execute, channel_action_fn, channel,
value)

self.cases.append(select_case)

return select_case

def default(self):
"""Create a default case block for this condition.
"""
default_case = SelectCase(len(self.cases), self.case_to_execute)

self.cases.append(default_case)

return default_case

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
return False

# Create a select op and another block to wrap its
# case blocks.
select_block = self.helper.main_program.current_block()
parent_block = self.helper.main_program.block(select_block.parent_idx)

# Construct each case op, inside the newly created select block.
serialized_cases = []
for case in self.cases:
serialized_cases.append(case.construct_op())

intermediate = set()
params = set()

for case_block in select_block.ops:
if case_block.attrs and 'sub_block' in case_block.attrs:
for each_op in case_block.attrs['sub_block'].ops:
assert isinstance(each_op, Operator)
for iname in each_op.input_names:
for in_var_name in each_op.input(iname):
if in_var_name not in intermediate:
params.add(in_var_name)

for oname in each_op.output_names:
for out_var_name in each_op.output(oname):
intermediate.add(out_var_name)

out_list = [
parent_block.var(var_name) for var_name in parent_block.vars
if var_name in intermediate
]

X = [select_block.var_recursive(x_name) for x_name in params]

# Needs to be used by `equal` inside the cases block.
X.append(self.case_to_execute)

# Construct the select op.
parent_block.append_op(
type='select',
inputs={'X': X,
'case_to_execute': self.case_to_execute},
attrs={'sub_block': select_block,
'cases': serialized_cases},
outputs={'Out': out_list})

return super(Select, self).__exit__(exc_type, exc_val, exc_tb)


def make_channel(dtype, capacity=0):
"""
Helps implementation of a concurrent program by creating a "channel" of
Expand Down
Loading

0 comments on commit 42a0798

Please sign in to comment.