From e97969d08df104604a97c964e31b7782032dd66f Mon Sep 17 00:00:00 2001 From: Andrei-Octavian Brabete Date: Wed, 29 May 2019 21:35:39 +0100 Subject: [PATCH] Clean: Remove len field from DataCallback --- srcs/cpp/include/kungfu.h | 2 +- srcs/cpp/include/kungfu_base.h | 6 ++--- srcs/cpp/src/kungfu_base.cpp | 2 +- srcs/cpp/src/tensorflow/ops/p2p.cpp | 39 ++++++++++------------------- srcs/go/libkungfu-comm/main.go | 2 +- 5 files changed, 19 insertions(+), 32 deletions(-) diff --git a/srcs/cpp/include/kungfu.h b/srcs/cpp/include/kungfu.h index 4cd6f0f4c..2da956e50 100644 --- a/srcs/cpp/include/kungfu.h +++ b/srcs/cpp/include/kungfu.h @@ -57,7 +57,7 @@ extern void order_group_wait(order_group_t *); #include typedef std::function DoneCallback; -typedef std::function DataCallback; +typedef std::function DataCallback; extern int KungfuReduce(const void *sendbuf, void *recvbuf, int count, KungFu_Datatype dtype, KungFu_Op op, const char *name, diff --git a/srcs/cpp/include/kungfu_base.h b/srcs/cpp/include/kungfu_base.h index 4657b0445..c134685ad 100644 --- a/srcs/cpp/include/kungfu_base.h +++ b/srcs/cpp/include/kungfu_base.h @@ -13,7 +13,7 @@ extern void invoke_callback(callback_t *); extern void delete_callback(callback_t *); typedef struct data_callback_s data_callback_t; -extern void invoke_data_callback(data_callback_t *, void *, int len); +extern void invoke_data_callback(data_callback_t *, void *); extern void delete_data_callback(data_callback_t *); extern void float16_sum(void *z, const void *x, const void *y, int len); @@ -37,12 +37,12 @@ struct CallbackWrapper { }; struct data_callback_s { - using func_t = std::function; + using func_t = std::function; public: explicit data_callback_s(const func_t &f) : f_(f) {} - void operator()(void *data, int len) { f_(data, len); } + void operator()(void *data) { f_(data); } private: func_t f_; diff --git a/srcs/cpp/src/kungfu_base.cpp b/srcs/cpp/src/kungfu_base.cpp index 14e7b26ec..2d17fac12 100644 --- a/srcs/cpp/src/kungfu_base.cpp +++ b/srcs/cpp/src/kungfu_base.cpp @@ -13,7 +13,7 @@ void invoke_callback(callback_t *f) { (*f)(); } void delete_callback(callback_t *f) { delete f; } -void invoke_data_callback(data_callback_t *f, void *data, int len) { (*f)(data, len); } +void invoke_data_callback(data_callback_t *f, void *data) { (*f)(data); } void delete_data_callback(data_callback_t *f) { delete f; } diff --git a/srcs/cpp/src/tensorflow/ops/p2p.cpp b/srcs/cpp/src/tensorflow/ops/p2p.cpp index 828fe9b16..15bb96140 100644 --- a/srcs/cpp/src/tensorflow/ops/p2p.cpp +++ b/srcs/cpp/src/tensorflow/ops/p2p.cpp @@ -116,9 +116,7 @@ class RequestModel : public AsyncOpKernel errors::InvalidArgument("ranks_ must not be empty")); total_buf_size_ = 0; - for (int s : var_sizes_) { - total_buf_size_ += s; - } + for (int s : var_sizes_) { total_buf_size_ += s; } } public: @@ -143,15 +141,12 @@ class RequestModel : public AsyncOpKernel std::uniform_int_distribution dist(0, ranks_.size() - 1); int destination = dist(engine); - while (destination == self_rank_) { - destination = dist(engine); - } + while (destination == self_rank_) { destination = dist(engine); } - std::function func = [ - &, modelBuf = modelBuf, type_size_bytes_ = type_size_bytes_, - outputs = outputs, var_sizes_ = var_sizes_, done = done - ]() - { + std::function func = [&, modelBuf = modelBuf, + type_size_bytes_ = type_size_bytes_, + outputs = outputs, + var_sizes_ = var_sizes_, done = done]() { std::lock_guard l(mu_); int offset = 0; @@ -217,9 +212,7 @@ class SaveModel : public OpKernel // number of floats it has total_buf_size_ = 0; - for (int s : var_sizes_) { - total_buf_size_ += s; - } + for (int s : var_sizes_) { total_buf_size_ += s; } modelBuf = (unsigned char *)malloc(total_buf_size_ * type_size_bytes_); } @@ -322,9 +315,7 @@ class RequestModelWithPrefetch : public OpKernel errors::InvalidArgument("ranks_ must not be empty")); total_buf_size_ = 0; - for (int s : var_sizes_) { - total_buf_size_ += s; - } + for (int s : var_sizes_) { total_buf_size_ += s; } } public: @@ -345,9 +336,7 @@ class RequestModelWithPrefetch : public OpKernel std::uniform_int_distribution dist(0, ranks_.size() - 1); int destination = dist(engine); - while (destination == self_rank_) { - destination = dist(engine); - } + while (destination == self_rank_) { destination = dist(engine); } // Fill in the model Buffer with response from random peer if (modelBuf == nullptr) { @@ -357,12 +346,10 @@ class RequestModelWithPrefetch : public OpKernel _kungfu_world->Request(destination, (void *)modelBuf, total_buf_size_, to_kungfu_type(context->input(0).dtype())); - prefetchCallback = [ - &, modelBuf = modelBuf, prefetchBuf = prefetchBuf, - total_buf_size_ = total_buf_size_, - type_size_bytes_ = type_size_bytes_ - ]() - { + prefetchCallback = [&, modelBuf = modelBuf, + prefetchBuf = prefetchBuf, + total_buf_size_ = total_buf_size_, + type_size_bytes_ = type_size_bytes_]() { std::lock_guard l(mu_); std::copy(prefetchBuf, prefetchBuf + total_buf_size_ * type_size_bytes_, diff --git a/srcs/go/libkungfu-comm/main.go b/srcs/go/libkungfu-comm/main.go index efdb2c3f9..311193e78 100644 --- a/srcs/go/libkungfu-comm/main.go +++ b/srcs/go/libkungfu-comm/main.go @@ -51,7 +51,7 @@ func GoKungfuRank() int { func GoKungfuRegisterDataCallback(name *C.char, handle *C.data_callback_t) int { sess := kungfu.CurrentSession() return sess.RegisterDataCallback(C.GoString(name), func(msg *rch.Message) { - C.invoke_data_callback(handle, unsafe.Pointer(&msg.Data[0]), C.int(msg.Length)) + C.invoke_data_callback(handle, unsafe.Pointer(&msg.Data[0])) }) }