Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ jobs:
git submodule update --init --recursive
mkdir -p build
cd build
cmake .. -DBUILD_TEST=ON -Dlibuv_ROOT=${{ env.libuv_path }} `
cmake .. -DBUILD_TEST=ON `
-DUSE_LIBUV=ON -Dlibuv_ROOT=${{ env.libuv_path }} `
-DGTEST_LIBRARY=${{ env.gtest_lib_path }}/lib/gtest.lib `
-DGTEST_INCLUDE_DIR=${{ env.gtest_lib_path }}/include `
-DGTEST_MAIN_LIBRARY=${{ env.gtest_lib_path }}/lib/gtest_main.lib
Expand Down
13 changes: 10 additions & 3 deletions gloo/common/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@
#include <limits>
#include <vector>

#include "gloo/common/error.h"
#include "gloo/common/string.h"

namespace gloo {

#define GLOO_LOG_MSG(level, ...) \
std::cerr << ::gloo::MakeString( \
"[", __FILE__, ":", __LINE__, "] ", level, " ", __VA_ARGS__, "\n")

#define GLOO_INFO(...) GLOO_LOG_MSG("INFO", __VA_ARGS__)
#define GLOO_ERROR(...) GLOO_LOG_MSG("ERROR", __VA_ARGS__)
#define GLOO_WARN(...) GLOO_LOG_MSG("WARN", __VA_ARGS__)
#define GLOO_DEBUG(...) // GLOO_LOG_MSG("DEBUG", __VA_ARGS__)

class EnforceNotMet : public std::exception {
public:
EnforceNotMet(
Expand Down Expand Up @@ -157,7 +167,4 @@ BINARY_COMP_HELPER(LessEquals, <=)
#define GLOO_ENFORCE_GT(x, y, ...) \
GLOO_ENFORCE_THAT_IMPL(Greater((x), (y)), #x " > " #y, __VA_ARGS__)

#define GLOO_ERROR(...) \
std::cerr << "Gloo error: " << ::gloo::MakeString(__VA_ARGS__) << std::endl

} // namespace gloo
19 changes: 19 additions & 0 deletions gloo/test/base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const std::vector<Transport> kTransportsForClassAlgorithms = {
#if GLOO_HAVE_TRANSPORT_TCP_TLS
Transport::TCP_TLS,
#endif
#if GLOO_HAVE_TRANSPORT_IBVERBS
Transport::IBVERBS,
#endif
};

// Transports that function algorithms can be tested against.
Expand All @@ -32,7 +35,12 @@ const std::vector<Transport> kTransportsForFunctionAlgorithms = {
#if GLOO_HAVE_TRANSPORT_TCP_TLS
Transport::TCP_TLS,
#endif
#if GLOO_HAVE_TRANSPORT_UV
Transport::UV,
#endif
#if GLOO_HAVE_TRANSPORT_IBVERBS
Transport::IBVERBS,
#endif
};

std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) {
Expand All @@ -59,6 +67,17 @@ std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) {
return ::gloo::transport::uv::CreateDevice(kDefaultDevice);
#endif
}
#endif
#if GLOO_HAVE_TRANSPORT_IBVERBS
if (transport == Transport::IBVERBS) {
gloo::transport::ibverbs::attr attr;
attr.port = 1;
try {
return ::gloo::transport::ibverbs::CreateDevice(attr);
} catch (const InvalidOperationException& e) {
GLOO_INFO("IBVERBS not available: ", e.what());
}
}
#endif
return nullptr;
}
Expand Down
6 changes: 6 additions & 0 deletions gloo/test/base_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#include "gloo/transport/uv/device.h"
#endif

#if GLOO_HAVE_TRANSPORT_IBVERBS
#include "gloo/transport/ibverbs/device.h"
#endif

namespace gloo {
namespace test {

Expand Down Expand Up @@ -64,6 +68,7 @@ enum Transport {
TCP_TLS,
#endif
UV,
IBVERBS,
};

extern const std::vector<Transport> kTransportsForClassAlgorithms;
Expand Down Expand Up @@ -117,6 +122,7 @@ class BaseTest : public ::testing::Test {
// socket address.
auto device = device_creator(transport);
if (!device) {
GTEST_SKIP() << "Skipping test: transport not available";
return;
}
context->connectFullMesh(store, device);
Expand Down
4 changes: 1 addition & 3 deletions gloo/test/send_recv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include <array>
#include <unordered_set>

#include "gloo/transport/tcp/unbound_buffer.h"

namespace gloo {
namespace test {
namespace {
Expand Down Expand Up @@ -515,7 +513,7 @@ INSTANTIATE_TEST_CASE_P(
SendRecvDefault,
SendRecvTest,
::testing::Combine(
::testing::Values(Transport::TCP, Transport::UV),
::testing::ValuesIn(kTransportsForFunctionAlgorithms),
::testing::Values(2, 3, 4, 5, 6, 7, 8),
::testing::Values(1)));

Expand Down
4 changes: 4 additions & 0 deletions gloo/test/tcp_test.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <gtest/gtest.h>

#if GLOO_HAVE_TRANSPORT_TCP

#include <gloo/transport/tcp/helpers.h>
#include <gloo/transport/tcp/loop.h>

Expand Down Expand Up @@ -34,3 +36,5 @@ TEST(TcpTest, ConnectTimeout) {
} // namespace tcp
} // namespace transport
} // namespace gloo

#endif // GLOO_HAVE_TRANSPORT_TCP
2 changes: 2 additions & 0 deletions gloo/transport/ibverbs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ list(APPEND GLOO_TRANSPORT_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/device.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/memory_region.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/pair.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/unbound_buffer.cc"
)

list(APPEND GLOO_TRANSPORT_HDRS
Expand All @@ -14,6 +15,7 @@ list(APPEND GLOO_TRANSPORT_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}/device.h"
"${CMAKE_CURRENT_SOURCE_DIR}/memory_region.h"
"${CMAKE_CURRENT_SOURCE_DIR}/pair.h"
"${CMAKE_CURRENT_SOURCE_DIR}/unbound_buffer.h"
)

set(GLOO_TRANSPORT_SRCS ${GLOO_TRANSPORT_SRCS} PARENT_SCOPE)
Expand Down
44 changes: 26 additions & 18 deletions gloo/transport/ibverbs/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ Buffer::Buffer(Pair* pair, int slot, void* ptr, size_t size)
ex_(nullptr) {
mr_ = ibv_reg_mr(
pair_->dev_->pd_,
ptr_,
size_,
size == 0 ? static_cast<void*>(&emptyBuf_) : ptr,
size == 0 ? sizeof(emptyBuf_) : size,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);

// Provide hint if the error is EFAULT and nv_peer_mem is not loaded
Expand Down Expand Up @@ -59,7 +59,12 @@ Buffer::Buffer(Pair* pair, int slot, void* ptr, size_t size)
}

Buffer::~Buffer() {
GLOO_ENFORCE_EQ(sendPending_, 0, "Destructing buffer expecting completions");
std::lock_guard<std::mutex> lock(m_);
if (sendPending_ > 0) {
GLOO_WARN(
"Destructing buffer with pending sends, sendPending_=", sendPending_);
}

ibv_dereg_mr(mr_);
}

Expand Down Expand Up @@ -167,36 +172,40 @@ void Buffer::waitSend() {
}

void Buffer::send(size_t offset, size_t length, size_t roffset) {
// Can't assert on roffset, since we don't know the size of
// the remote buffer. Refactor of initialization code needed
// to support this.
GLOO_ENFORCE_LE(offset + length, size_);

{
std::unique_lock<std::mutex> lock(m_);

// Can't assert on roffset, since we don't know the size of
// the remote buffer. Refactor of initialization code needed
// to support this.
GLOO_ENFORCE_LE(offset + length, size_);

checkErrorState();
}

if (debug_) {
std::cout << "[" << getpid() << "] ";
std::cout << "send " << length << " bytes";
std::cout << std::endl;
if (debug_) {
std::cout << "[" << getpid() << "] ";
std::cout << "send " << length << " bytes";
std::cout << std::endl;
}

// Increment number of sends in flight
sendPending_++;
}

// Increment number of sends in flight
sendPending_++;
// Release lock before calling into the pair to avoid deadlock.

pair_->send(this, offset, length, roffset);
}

void Buffer::handleCompletion(struct ibv_wc* wc) {
void Buffer::handleCompletion(int rank, struct ibv_wc* wc) {
std::unique_lock<std::mutex> lock(m_);

if (wc->opcode & IBV_WC_RECV) {
if (debug_) {
std::cout << "[" << getpid() << "] ";
std::cout << "recv " << wc->byte_len << " bytes";
std::cout << std::endl;
}
std::unique_lock<std::mutex> lock(m_);
recvCompletions_++;
recvCv_.notify_one();
} else if (wc->opcode == IBV_WC_RDMA_WRITE) {
Expand All @@ -205,7 +214,6 @@ void Buffer::handleCompletion(struct ibv_wc* wc) {
std::cout << "send complete";
std::cout << std::endl;
}
std::unique_lock<std::mutex> lock(m_);
sendCompletions_++;
sendPending_--;
sendCv_.notify_one();
Expand Down
14 changes: 11 additions & 3 deletions gloo/transport/ibverbs/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace gloo {
namespace transport {
namespace ibverbs {

class Buffer : public ::gloo::transport::Buffer {
class Buffer : public ::gloo::transport::Buffer, public BufferHandler {
public:
virtual ~Buffer();

Expand All @@ -33,18 +33,26 @@ class Buffer : public ::gloo::transport::Buffer {
virtual void waitRecv() override;
virtual void waitSend() override;

void handleCompletion(struct ibv_wc* wc);
void handleCompletion(int rank, struct ibv_wc* wc) override;

void signalError(const std::exception_ptr& ex);
void signalError(const std::exception_ptr& ex) override;
void checkErrorState();

bool isPeristentHandler() override {
return true;
}

protected:
// May only be constructed from helper function in pair.cc
Buffer(Pair* pair, int slot, void* ptr, size_t size);

Pair* pair_;

// Empty buffer to use when a nullptr buffer is created.
char emptyBuf_[1];

struct ibv_mr* mr_;
std::unique_ptr<struct ibv_mr> peerMr_;

std::mutex m_;
std::condition_variable recvCv_;
Expand Down
19 changes: 15 additions & 4 deletions gloo/transport/ibverbs/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "gloo/common/error.h"
#include "gloo/transport/ibverbs/device.h"
#include "gloo/transport/ibverbs/pair.h"
#include "gloo/transport/ibverbs/unbound_buffer.h"

namespace gloo {
namespace transport {
Expand All @@ -23,16 +24,26 @@ Context::~Context() {}

std::unique_ptr<transport::Pair>& Context::createPair(int rank) {
pairs_[rank] = std::unique_ptr<transport::Pair>(
new ibverbs::Pair(device_, getTimeout()));
new ibverbs::Pair(rank, device_, getTimeout()));
return pairs_[rank];
}

std::unique_ptr<transport::UnboundBuffer> Context::createUnboundBuffer(
void* ptr,
size_t size) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION(
"Unbound buffers not supported yet for ibverbs transport");
return std::unique_ptr<transport::UnboundBuffer>();
return std::make_unique<UnboundBuffer>(this->shared_from_this(), ptr, size);
}

void Context::signalException(const std::string& msg) {
// The `pairs_` vector is logically constant. After the context and
// all of its pairs have been created it is not mutated until the
// context is destructed. Therefore, we don't need to acquire this
// context's instance lock before looping over `pairs_`.
for (auto& pair : pairs_) {
if (pair) {
reinterpret_cast<ibverbs::Pair*>(pair.get())->signalIoFailure(msg);
}
}
}

} // namespace ibverbs
Expand Down
6 changes: 6 additions & 0 deletions gloo/transport/ibverbs/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ class Context : public ::gloo::transport::Context,
void* ptr,
size_t size) override;

// Set exception on every pair in this context. This is called when
// waiting for a send or recv operation on an unbound buffer times
// out. All pairs should be signaled and closed in that event.
void signalException(const std::string& msg);

protected:
std::shared_ptr<Device> device_;

friend class Pair;
friend class UnboundBuffer;
};

} // namespace ibverbs
Expand Down
Loading
Loading