Skip to content

Commit

Permalink
[DPER] Introduce barrier operation to force synchronization of thread…
Browse files Browse the repository at this point in the history
…s in async execution (#49322)

Summary:
Pull Request resolved: #49322

In some cases async execution might loose dependencies (Alias like ops) or produce suboptimal scheduling when there is an option which parts to schedule first. Example of the later behavior can happen in ModelParallel training where copy can get lower priority compared to the rest of the execution on the given GPU, which will caused other GPUs to starve.

This operator allows to address these issues by introducing extra explicit dependencies between ops.

Test Plan:
Unit-test/
E2E testing in the future diffs.

Reviewed By: xianjiec

Differential Revision: D24933471

fbshipit-source-id: 1668994c7856d73926cde022378a99e1e8db3567
  • Loading branch information
kennyhorror authored and facebook-github-bot committed Dec 16, 2020
1 parent 7518f54 commit 46debe7
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
50 changes: 50 additions & 0 deletions caffe2/operators/async_net_barrier_op.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "caffe2/operators/async_net_barrier_op.h"

namespace caffe2 {

namespace {
std::pair<std::vector<DeviceOption>, std::vector<DeviceOption>>
asyncBarrierOpDevInfer(const OperatorDef& def) {
auto op_device =
def.has_device_option() ? def.device_option() : DeviceOption();
ArgumentHelper helper(def);
auto cross_device = helper.GetSingleArgument<int>("cross_device", 0);
std::vector<DeviceOption> opt;
for (int i = 0; i < def.input().size(); ++i) {
if (cross_device == 1) {
DeviceOption dev;
dev.set_device_type(op_device.device_type());
dev.set_device_id(i);
opt.push_back(dev);
} else {
opt.push_back(op_device);
}
}
return std::make_pair(opt, opt);
}
}

OPERATOR_SCHEMA(AsyncNetBarrier)
.NumInputs(1, INT_MAX)
.NumOutputs(1, INT_MAX)
.IdenticalTypeAndShape()
.InputsCanCrossDevices()
.AllowOneToOneInplace()
.DeviceInferenceFunction(asyncBarrierOpDevInfer)
.SetDoc(R"DOC(
This is a pretty much no-op operator, since it's only purposes is make sure that
async_scheduling will schedule certian operations earlier than others.
Exaple where this operator can work well - mixture of data-parallel and model-
parallel training, where one wants to force that all copies are started before
data-parallel part starts.
)DOC")
.Arg(
"cross_device",
"Specifies either inputs should be across different devices in dev inference options");

SHOULD_NOT_DO_GRADIENT(AsyncNetBarrier);
REGISTER_CPU_OPERATOR(AsyncNetBarrier, AsyncNetBarrierOp<CPUContext>);


} // namespace caffe2
8 changes: 8 additions & 0 deletions caffe2/operators/async_net_barrier_op.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#include "caffe2/core/context_gpu.h"
#include "caffe2/operators/async_net_barrier_op.h"

namespace caffe2 {

REGISTER_CUDA_OPERATOR(AsyncNetBarrier, AsyncNetBarrierOp<CUDAContext>);

} // namespace caffe2
30 changes: 30 additions & 0 deletions caffe2/operators/async_net_barrier_op.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef CAFFE2_OPERATORS_ASYNC_BARRIER_OP_H_
#define CAFFE2_OPERATORS_ASYNC_BARRIER_OP_H_

#include "caffe2/core/context.h"
#include "caffe2/core/export_caffe2_op_to_c10.h"
#include "caffe2/core/operator.h"

namespace caffe2 {

template <class Context>
class AsyncNetBarrierOp : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
USE_SIMPLE_CTOR_DTOR(AsyncNetBarrierOp)

bool RunOnDevice() override {
// This is a pretty much no-op operator, since it's only purposes is make
// sure that async_scheduling will schedule certian operations earlier than
// others.
//
// Exaple where this operator can work well - mixture of data-parallel and
// model parallel training, where one wants to force that all copies are
// started before data-parallel part starts.
return true;
}
};

} // namespace caffe2

#endif // CAFFE2_OPERATORS_ASYNC_BARRIER_OP_H_
31 changes: 31 additions & 0 deletions caffe2/python/operator_test/async_net_barrier_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python3

import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core
from hypothesis import given


class TestAsyncNetBarrierOp(hu.HypothesisTestCase):
@given(
n=st.integers(1, 5),
shape=st.lists(st.integers(0, 5), min_size=1, max_size=3),
**hu.gcs
)
def test_async_net_barrier_op(self, n, shape, dc, gc):
test_inputs = [(100 * np.random.random(shape)).astype(np.float32) for _ in range(n)]
test_input_blobs = ["x_{}".format(i) for i in range(n)]

barrier_op = core.CreateOperator(
"AsyncNetBarrier",
test_input_blobs,
test_input_blobs,
device_option=gc,
)

def reference_func(*args):
self.assertEquals(len(args), n)
return args

self.assertReferenceChecks(gc, barrier_op, test_inputs, reference_func)

0 comments on commit 46debe7

Please sign in to comment.