Skip to content

Commit

Permalink
Merge pull request apache#48 from antinucleon/master
Browse files Browse the repository at this point in the history
Switch to DMLC logging
  • Loading branch information
tqchen committed Sep 15, 2015
2 parents 811c396 + 9ecd862 commit d3d6147
Show file tree
Hide file tree
Showing 31 changed files with 492 additions and 330 deletions.
8 changes: 4 additions & 4 deletions guide/config.mk
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#
# This is configuration script that you can use to compile mshadow
# Usage:
#
#
# include config.mk in your Makefile, or directly include the definition of variables
# include mshadow.mk after the variables are set
#
#
# Add MSHADOW_CFLAGS to the compile flags
# Add MSHADOW_LDFLAGS to the linker flags
# Add MSHADOW_NVCCFLAGS to the nvcc compile flags
Expand All @@ -22,11 +22,11 @@ USE_CUDA_PATH = NONE
#
# choose the version of blas you want to use
# can be: mkl, blas, atlas, openblas, apple
USE_BLAS = atlas
USE_BLAS = openblas
#
# add path to intel library, you may need it
# for MKL, if you did not add the path to enviroment variable
#
#
USE_INTEL_PATH = NONE

# whether compile with parameter server
Expand Down
6 changes: 3 additions & 3 deletions mshadow-ps/mshadow_ps.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class IModelUpdater {
* \param data the tensor data corresponding to the data we want to initialize
*/
virtual void InitModel_(int key, Tensor<cpu, 1, DType> data) {
utils::Error("InitModel: not implemented");
LOG(FATAL) << "InitModel: not implemented";
}
/*!
* \brief update the model, user can implement this one
Expand All @@ -312,7 +312,7 @@ class IModelUpdater {
* \param data the tensor data corresponding to the data we want to initialize
*/
virtual void Update_(int key, Tensor<cpu, 1, DType> data) {
utils::Error("InitModel: not implemented");
LOG(FATAL) << "InitModel: not implemented";
}
};
/*!
Expand Down Expand Up @@ -350,7 +350,7 @@ inline ISharedModel<xpu, DType> *CreateSharedModel(const char *type) {
#if MSHADOW_DIST_PS
if (!strcmp("dist", type)) return new DistModel<xpu, DType>();
#endif
utils::Error("unknown server type %s\n", type);
LOG(FATAL) << "unknown server type " << type;
return NULL;
}
} // namespace ps
Expand Down
2 changes: 1 addition & 1 deletion mshadow-ps/ps_dist-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class DistModel : public LocalModel<xpu, DType> {

// push and pull
Tensor<cpu, 2> sendrecv = data[0];
utils::Assert(data[0].CheckContiguous(), "data must be contiguous");
CHECK_EQ(data[0].CheckContiguous(), true) << "data must be contiguous";

int ts = shared_model_.Push(
::ps::Parameter::Request(key), sendrecv.dptr_, sendrecv.MSize(), false);
Expand Down
84 changes: 36 additions & 48 deletions mshadow-ps/ps_local-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
if (!strcmp(val, "sum")) {
push_operation[key] = kSum; return;
}
utils::Error("unknown push operation %s", val);
LOG(FATAL) << "unknown push operation " << val;
}
if (!strcmp(name, "reduce_thread")) {
nthread_reduction = atoi(val);
Expand All @@ -113,8 +113,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
} else if (!strcmp(val, "one")) {
perdev_pull_thread = 0;
} else {
utils::Error("invalid value for parameter pull_thread,"\
" can only be ndev or one");
LOG(FATAL) << "invalid value for parameter pull_thread," << " can only be ndev or one";
}
}
if (!strcmp(name, "push_thread")) {
Expand All @@ -123,8 +122,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
} else if (!strcmp(val, "one")) {
perdev_push_thread = 0;
} else {
utils::Error("invalid value for parameter push_thread,"\
" can only be ndev or one");
LOG(FATAL) << "invalid value for parameter push_thread," << " can only be ndev or one";
}
}
if (!strcmp(name, "update_on_server")) {
Expand All @@ -144,8 +142,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
if (p == NULL || p->wait.size() == 0) return;
PullEntry &e = *p;
// wake up waiters if any
utils::Assert(e.wait.size() == devices.size(),
"PullWait: must initialize the wait");
CHECK_EQ(e.wait.size(), devices.size()) << "PullWait: must initialize the wait";
PullWaitRecord &w = e.wait[wid];
if (!w.finished) {
wait_lock.Lock();
Expand All @@ -154,22 +151,20 @@ class LocalModel : public ISharedModel<xpu, DType> {
wait_cond.Wait(&wait_lock);
}
w.nwait -= 1;
utils::Assert(w.nwait >= 0, "boundary check");
CHECK_GE(w.nwait, 0) << "boundary check";
wait_lock.Unlock();
}
}
virtual void Init(const std::vector<int> &devices) {
utils::Check(init_end == 0,
"LocalServer.Init can only call Init once");
utils::Check(devices.size() != 0,
"LocalServer.Init: must at least contain 1 devices");
CHECK_EQ(init_end, 0) << "LocalServer.Init can only call Init once";
CHECK_NE(devices.size(), 0) << "LocalServer.Init: must at least contain 1 devices";
this->devices = devices;
destroy_signal = false;
// initialize device id to local index
dev2index.clear();
for (size_t i = 0; i < devices.size(); ++i) {
int devid = devices[i];
utils::Assert(devid >= 0, "device id must be bigger than 0");
CHECK_GE(devid, 0) << "device id must be bigger than 0";
if (devid >= static_cast<int>(dev2index.size())) {
dev2index.resize(devid + 1, -1);
}
Expand Down Expand Up @@ -244,8 +239,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
virtual void CheckWeight_(Tensor<xpu, 2, DType> data,
int key,
int devid) {
utils::Check(test_on_server != 0,
"must be in pair debug mode");
CHECK_NE(test_on_server, 0) << "must be in pair debug mode";
PushEntry &e = push_map.GetRef(key);
mshadow::TensorContainer<cpu, 2, DType> tmp(false);
tmp.Resize(data.shape_);
Expand Down Expand Up @@ -307,10 +301,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
CallbackFunction callback,
void *callback_arg) {
PullEntry &e = pull_map.GetRef(key);
utils::Assert(e.req.size() == devices.size(),
"PullReq: must initialize the key, req");
utils::Assert(e.wait.size() == devices.size(),
"PullReq: must initialize the key, wait");
CHECK_EQ(e.req.size(), devices.size()) << "PullReq: must initialize the key, req";
CHECK_EQ(e.wait.size(), devices.size()) << "PullReq: must initialize the key, wait";
const int wid = GetWorkIndex(devid);
PullReqRecord &r = e.req[wid];
r.dest = data;
Expand All @@ -323,9 +315,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
wait_lock.Unlock();
// check ready event
request_lock.Lock();
utils::Check(!r.pending,
"key = %d, cannot send duplicate pull request before it finishes",
key);
CHECK_EQ(!r.pending, true) << "key = " << key
<< "cannot send duplicate pull request before it finishes";
if (e.req[wid].ready) {
if (perdev_pull_thread != 0) {
pull_queues[wid].Push(std::make_pair(key, devid));
Expand All @@ -344,8 +335,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
*/
virtual void PullReady(Tensor<cpu, 2> data, int key) {
PullEntry &e = pull_map.GetRef(key);
utils::Assert(e.req.size() == devices.size(),
"PullReady: must initialize the key, req");
CHECK_EQ(e.req.size(), devices.size()) << "PullReady: must initialize the key, req";
request_lock.Lock();
e.src = data;
for (index_t i = 0; i < e.req.size(); ++i) {
Expand Down Expand Up @@ -393,7 +383,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
PushEntry &e = push_map.GetRef(key);
this->PullReady(e.weight, key);
} else {
utils::Assert(test_on_server != 0, "test mode");
CHECK_NE(test_on_server, 0) << "test mode";
this->PullReady(data[0], key);
}
return;
Expand All @@ -408,7 +398,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
this->PullReady(data.FlatTo2D(), key);
return;
}
default: utils::Error("unknown LocalOp");
default: LOG(FATAL) << "unknown LocalOp";
}
}
/*!
Expand All @@ -425,7 +415,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
PushEntry &e = push_map.GetRef(key);
this->PullReady(e.weight, key);
} else {
utils::Assert(test_on_server != 0, "test mode");
CHECK_NE(test_on_server, 0) << "test mode";
this->PullReady(data, key);
}
} else {
Expand Down Expand Up @@ -533,8 +523,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
mshadow::AllocSpace(&data, false);
if (need_weight) mshadow::AllocSpace(&weight);
}
utils::Assert(data.CheckContiguous(), "Init");
utils::Assert(!need_weight || weight.CheckContiguous(), "Init");
CHECK_EQ(data.CheckContiguous(), true) << "Data must be contiguous";
CHECK(!need_weight || weight.CheckContiguous()) << "Weight must be contiguous";
num_copied = 0;
copied.resize(ndevice, false);
}
Expand Down Expand Up @@ -638,9 +628,12 @@ class LocalModel : public ISharedModel<xpu, DType> {
if (queue->Pop(&tsk)) {
const int wid = GetWorkIndex(tsk.devid);
PushEntry &e = push_map.GetRef(tsk.key);
utils::Check(e.data[0][0].shape_ == tsk.data.shape_,
"Tensor with same key must share same shape");
utils::Assert(!e.copied[wid], "data inconsistency");
CHECK_EQ(e.data[0][0].shape_, tsk.data.shape_)
<< "Tensor with same key must share same shape "
<< e.data[0][0].shape_.ToString()
<< " vs "
<< tsk.data.shape_.ToString();
CHECK_EQ(!e.copied[wid], true) << "data inconsistency";
// start copy
SetDevice<xpu>(tsk.devid);
Copy(e.data[e.copyin_version][wid], tsk.data, push_stream[wid]);
Expand All @@ -663,7 +656,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
this->HandlePushFinish(e.data[cp_version], tsk.key);
}
} else {
utils::Assert(destroy_signal, "abort but not destroy");
CHECK_EQ(destroy_signal, true) << "abort but not destroy";
}
}
}
Expand All @@ -681,9 +674,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
}
}
inline void PushHandlerLocal(size_t tid) {
utils::Assert(tid < devices.size(), "threadid exceed boundary");
utils::Assert(push_queues.size() == devices.size(),
"must have one pull_queue per device");
CHECK_LT(tid, devices.size()) << "threadid exceed boundary";
CHECK_EQ(push_queues.size(), devices.size()) << "must have one pull_queue per device";
// allocate stream resources
SetDevice<xpu>(devices[tid]);
push_stream[tid] = NewStream<xpu>();
Expand Down Expand Up @@ -715,8 +707,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
PullEntry &e = pull_map.GetRef(key);
{
// handle request
utils::Assert(e.req.size() == devices.size(),
"PullHandler: must initialize the key, req");
CHECK_EQ(e.req.size(), devices.size()) << "PullHandler: must initialize the key, req";
PullReqRecord &r = e.req[wid];
SetDevice<xpu>(devid);
Copy(r.dest, e.src, pull_stream[wid]);
Expand All @@ -729,8 +720,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
}
{
// wake up waiters if any
utils::Assert(e.wait.size() == devices.size(),
"PullHandler, must initialize the key, req");
CHECK_EQ(e.wait.size(), devices.size()) << "PullHandler, must initialize the key, req";
PullWaitRecord &w = e.wait[wid];
wait_lock.Lock();
w.finished = true;
Expand All @@ -740,7 +730,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
wait_lock.Unlock();
}
} else {
utils::Assert(destroy_signal, "abort but not destroy");
CHECK_EQ(destroy_signal, true) << "abort but not destroy";
}
}
}
Expand All @@ -759,9 +749,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
}
}
inline void PullHandlerLocal(size_t tid) {
utils::Assert(tid < devices.size(), "threadid exceed boundary");
utils::Assert(pull_queues.size() == devices.size(),
"must have one pull_queue per device");
CHECK_LT(tid, devices.size()) << "threadid exceed boundary";
CHECK_EQ(pull_queues.size(), devices.size()) << "must have one pull_queue per device";
// allocate stream resources
SetDevice<xpu>(devices[tid]);
pull_stream[tid] = NewStream<xpu>();
Expand All @@ -783,10 +772,9 @@ class LocalModel : public ISharedModel<xpu, DType> {
}
// get internal index of device
inline int GetWorkIndex(int devid) const {
utils::Check(devid >= 0 &&
devid < static_cast<int>(dev2index.size()) &&
dev2index[devid] >= 0,
"Push: invalid devid");
CHECK(devid >= 0 &&
devid < static_cast<int>(dev2index.size()) &&
dev2index[devid] >= 0) << "Push: invalid devid";
return dev2index[devid];
}
// functions to handle pull
Expand Down
12 changes: 5 additions & 7 deletions mshadow-ps/ps_rabit-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class RabitModel : public LocalModel<xpu, DType> {
int key) {
// summation the data fron all devices
LocalModel<xpu, DType>::ReduceSum(data);
utils::Assert(data[0].CheckContiguous(), "data must be contiguous");
CHECK_EQ(data[0].CheckContiguous(), true) << "data must be contiguous";
ReduceTask tsk;
tsk.data = data[0]; tsk.key = key;
reduce_queue_.Push(tsk, 0);
Expand All @@ -87,19 +87,17 @@ class RabitModel : public LocalModel<xpu, DType> {
while (!destroy_reduce_thread_) {
ReduceTask tsk;
if (reduce_queue_.Pop(&tsk)) {
utils::Check(disable_allreduce_ == 0,
"Allreduce disabled error");
CHECK_EQ(disable_allreduce_, 0) << "Allreduce disabled error";
int key = tsk.key;
rabit::Allreduce<rabit::op::Max>(&key, 1);
utils::Check(key == tsk.key, "Allreduce not concensus");
CHECK_EQ(key, tsk.key) << "Allreduce not concensus";
rabit::Allreduce<rabit::op::Sum>
(tsk.data.dptr_, tsk.data.MSize());
tsk.data *= 1.0f / rabit::GetWorldSize();
utils::Check(disable_allreduce_ == 0,
"Allreduce disabled error");
CHECK_EQ(disable_allreduce_, 0) << "Allreduce disabled error";
this->HandleReduceFinish(tsk.data, tsk.key);
} else {
utils::Assert(destroy_reduce_thread_, "abort but not destroy");
CHECK_EQ(destroy_reduce_thread_, true) << "abort but not destroy";
}
}
}
Expand Down
Loading

0 comments on commit d3d6147

Please sign in to comment.