diff --git a/.github/workflows/build-windows.yml b/.github/workflows/build-windows.yml index b254151da..310423b82 100644 --- a/.github/workflows/build-windows.yml +++ b/.github/workflows/build-windows.yml @@ -75,7 +75,8 @@ jobs: git submodule update --init --recursive mkdir -p build cd build - cmake .. -DBUILD_TEST=ON -Dlibuv_ROOT=${{ env.libuv_path }} ` + cmake .. -DBUILD_TEST=ON ` + -DUSE_LIBUV=ON -Dlibuv_ROOT=${{ env.libuv_path }} ` -DGTEST_LIBRARY=${{ env.gtest_lib_path }}/lib/gtest.lib ` -DGTEST_INCLUDE_DIR=${{ env.gtest_lib_path }}/include ` -DGTEST_MAIN_LIBRARY=${{ env.gtest_lib_path }}/lib/gtest_main.lib diff --git a/gloo/common/logging.h b/gloo/common/logging.h index 7f04700c3..a678fdc87 100644 --- a/gloo/common/logging.h +++ b/gloo/common/logging.h @@ -15,10 +15,20 @@ #include #include +#include "gloo/common/error.h" #include "gloo/common/string.h" namespace gloo { +#define GLOO_LOG_MSG(level, ...) \ + std::cerr << ::gloo::MakeString( \ + "[", __FILE__, ":", __LINE__, "] ", level, " ", __VA_ARGS__, "\n") + +#define GLOO_INFO(...) GLOO_LOG_MSG("INFO", __VA_ARGS__) +#define GLOO_ERROR(...) GLOO_LOG_MSG("ERROR", __VA_ARGS__) +#define GLOO_WARN(...) GLOO_LOG_MSG("WARN", __VA_ARGS__) +#define GLOO_DEBUG(...) // GLOO_LOG_MSG("DEBUG", __VA_ARGS__) + class EnforceNotMet : public std::exception { public: EnforceNotMet( @@ -157,7 +167,4 @@ BINARY_COMP_HELPER(LessEquals, <=) #define GLOO_ENFORCE_GT(x, y, ...) \ GLOO_ENFORCE_THAT_IMPL(Greater((x), (y)), #x " > " #y, __VA_ARGS__) -#define GLOO_ERROR(...) \ - std::cerr << "Gloo error: " << ::gloo::MakeString(__VA_ARGS__) << std::endl - } // namespace gloo diff --git a/gloo/test/base_test.cc b/gloo/test/base_test.cc index 6a0ecb54b..6f678ede9 100644 --- a/gloo/test/base_test.cc +++ b/gloo/test/base_test.cc @@ -21,6 +21,9 @@ const std::vector kTransportsForClassAlgorithms = { #if GLOO_HAVE_TRANSPORT_TCP_TLS Transport::TCP_TLS, #endif +#if GLOO_HAVE_TRANSPORT_IBVERBS + Transport::IBVERBS, +#endif }; // Transports that function algorithms can be tested against. @@ -32,7 +35,12 @@ const std::vector kTransportsForFunctionAlgorithms = { #if GLOO_HAVE_TRANSPORT_TCP_TLS Transport::TCP_TLS, #endif +#if GLOO_HAVE_TRANSPORT_UV Transport::UV, +#endif +#if GLOO_HAVE_TRANSPORT_IBVERBS + Transport::IBVERBS, +#endif }; std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) { @@ -59,6 +67,17 @@ std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) { return ::gloo::transport::uv::CreateDevice(kDefaultDevice); #endif } +#endif +#if GLOO_HAVE_TRANSPORT_IBVERBS + if (transport == Transport::IBVERBS) { + gloo::transport::ibverbs::attr attr; + attr.port = 1; + try { + return ::gloo::transport::ibverbs::CreateDevice(attr); + } catch (const InvalidOperationException& e) { + GLOO_INFO("IBVERBS not available: ", e.what()); + } + } #endif return nullptr; } diff --git a/gloo/test/base_test.h b/gloo/test/base_test.h index add9b8a19..4245a77cd 100644 --- a/gloo/test/base_test.h +++ b/gloo/test/base_test.h @@ -33,6 +33,10 @@ #include "gloo/transport/uv/device.h" #endif +#if GLOO_HAVE_TRANSPORT_IBVERBS +#include "gloo/transport/ibverbs/device.h" +#endif + namespace gloo { namespace test { @@ -64,6 +68,7 @@ enum Transport { TCP_TLS, #endif UV, + IBVERBS, }; extern const std::vector kTransportsForClassAlgorithms; @@ -117,6 +122,7 @@ class BaseTest : public ::testing::Test { // socket address. auto device = device_creator(transport); if (!device) { + GTEST_SKIP() << "Skipping test: transport not available"; return; } context->connectFullMesh(store, device); diff --git a/gloo/test/send_recv_test.cc b/gloo/test/send_recv_test.cc index 289f96d16..ad9b1d240 100644 --- a/gloo/test/send_recv_test.cc +++ b/gloo/test/send_recv_test.cc @@ -12,8 +12,6 @@ #include #include -#include "gloo/transport/tcp/unbound_buffer.h" - namespace gloo { namespace test { namespace { @@ -515,7 +513,7 @@ INSTANTIATE_TEST_CASE_P( SendRecvDefault, SendRecvTest, ::testing::Combine( - ::testing::Values(Transport::TCP, Transport::UV), + ::testing::ValuesIn(kTransportsForFunctionAlgorithms), ::testing::Values(2, 3, 4, 5, 6, 7, 8), ::testing::Values(1))); diff --git a/gloo/test/tcp_test.cc b/gloo/test/tcp_test.cc index eeb63a066..56a70057a 100644 --- a/gloo/test/tcp_test.cc +++ b/gloo/test/tcp_test.cc @@ -1,5 +1,7 @@ #include +#if GLOO_HAVE_TRANSPORT_TCP + #include #include @@ -34,3 +36,5 @@ TEST(TcpTest, ConnectTimeout) { } // namespace tcp } // namespace transport } // namespace gloo + +#endif // GLOO_HAVE_TRANSPORT_TCP diff --git a/gloo/transport/ibverbs/CMakeLists.txt b/gloo/transport/ibverbs/CMakeLists.txt index 3a5c1d660..048ca793d 100644 --- a/gloo/transport/ibverbs/CMakeLists.txt +++ b/gloo/transport/ibverbs/CMakeLists.txt @@ -5,6 +5,7 @@ list(APPEND GLOO_TRANSPORT_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/device.cc" "${CMAKE_CURRENT_SOURCE_DIR}/memory_region.cc" "${CMAKE_CURRENT_SOURCE_DIR}/pair.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/unbound_buffer.cc" ) list(APPEND GLOO_TRANSPORT_HDRS @@ -14,6 +15,7 @@ list(APPEND GLOO_TRANSPORT_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/device.h" "${CMAKE_CURRENT_SOURCE_DIR}/memory_region.h" "${CMAKE_CURRENT_SOURCE_DIR}/pair.h" + "${CMAKE_CURRENT_SOURCE_DIR}/unbound_buffer.h" ) set(GLOO_TRANSPORT_SRCS ${GLOO_TRANSPORT_SRCS} PARENT_SCOPE) diff --git a/gloo/transport/ibverbs/buffer.cc b/gloo/transport/ibverbs/buffer.cc index 1bce49d30..a60b6346e 100644 --- a/gloo/transport/ibverbs/buffer.cc +++ b/gloo/transport/ibverbs/buffer.cc @@ -30,8 +30,8 @@ Buffer::Buffer(Pair* pair, int slot, void* ptr, size_t size) ex_(nullptr) { mr_ = ibv_reg_mr( pair_->dev_->pd_, - ptr_, - size_, + size == 0 ? static_cast(&emptyBuf_) : ptr, + size == 0 ? sizeof(emptyBuf_) : size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); // Provide hint if the error is EFAULT and nv_peer_mem is not loaded @@ -59,7 +59,12 @@ Buffer::Buffer(Pair* pair, int slot, void* ptr, size_t size) } Buffer::~Buffer() { - GLOO_ENFORCE_EQ(sendPending_, 0, "Destructing buffer expecting completions"); + std::lock_guard lock(m_); + if (sendPending_ > 0) { + GLOO_WARN( + "Destructing buffer with pending sends, sendPending_=", sendPending_); + } + ibv_dereg_mr(mr_); } @@ -167,36 +172,40 @@ void Buffer::waitSend() { } void Buffer::send(size_t offset, size_t length, size_t roffset) { - // Can't assert on roffset, since we don't know the size of - // the remote buffer. Refactor of initialization code needed - // to support this. - GLOO_ENFORCE_LE(offset + length, size_); - { std::unique_lock lock(m_); + + // Can't assert on roffset, since we don't know the size of + // the remote buffer. Refactor of initialization code needed + // to support this. + GLOO_ENFORCE_LE(offset + length, size_); + checkErrorState(); - } - if (debug_) { - std::cout << "[" << getpid() << "] "; - std::cout << "send " << length << " bytes"; - std::cout << std::endl; + if (debug_) { + std::cout << "[" << getpid() << "] "; + std::cout << "send " << length << " bytes"; + std::cout << std::endl; + } + + // Increment number of sends in flight + sendPending_++; } - // Increment number of sends in flight - sendPending_++; + // Release lock before calling into the pair to avoid deadlock. pair_->send(this, offset, length, roffset); } -void Buffer::handleCompletion(struct ibv_wc* wc) { +void Buffer::handleCompletion(int rank, struct ibv_wc* wc) { + std::unique_lock lock(m_); + if (wc->opcode & IBV_WC_RECV) { if (debug_) { std::cout << "[" << getpid() << "] "; std::cout << "recv " << wc->byte_len << " bytes"; std::cout << std::endl; } - std::unique_lock lock(m_); recvCompletions_++; recvCv_.notify_one(); } else if (wc->opcode == IBV_WC_RDMA_WRITE) { @@ -205,7 +214,6 @@ void Buffer::handleCompletion(struct ibv_wc* wc) { std::cout << "send complete"; std::cout << std::endl; } - std::unique_lock lock(m_); sendCompletions_++; sendPending_--; sendCv_.notify_one(); diff --git a/gloo/transport/ibverbs/buffer.h b/gloo/transport/ibverbs/buffer.h index 4a9c9aba0..d8699460f 100644 --- a/gloo/transport/ibverbs/buffer.h +++ b/gloo/transport/ibverbs/buffer.h @@ -24,7 +24,7 @@ namespace gloo { namespace transport { namespace ibverbs { -class Buffer : public ::gloo::transport::Buffer { +class Buffer : public ::gloo::transport::Buffer, public BufferHandler { public: virtual ~Buffer(); @@ -33,18 +33,26 @@ class Buffer : public ::gloo::transport::Buffer { virtual void waitRecv() override; virtual void waitSend() override; - void handleCompletion(struct ibv_wc* wc); + void handleCompletion(int rank, struct ibv_wc* wc) override; - void signalError(const std::exception_ptr& ex); + void signalError(const std::exception_ptr& ex) override; void checkErrorState(); + bool isPeristentHandler() override { + return true; + } + protected: // May only be constructed from helper function in pair.cc Buffer(Pair* pair, int slot, void* ptr, size_t size); Pair* pair_; + // Empty buffer to use when a nullptr buffer is created. + char emptyBuf_[1]; + struct ibv_mr* mr_; + std::unique_ptr peerMr_; std::mutex m_; std::condition_variable recvCv_; diff --git a/gloo/transport/ibverbs/context.cc b/gloo/transport/ibverbs/context.cc index f3357b403..7c892eac5 100644 --- a/gloo/transport/ibverbs/context.cc +++ b/gloo/transport/ibverbs/context.cc @@ -11,6 +11,7 @@ #include "gloo/common/error.h" #include "gloo/transport/ibverbs/device.h" #include "gloo/transport/ibverbs/pair.h" +#include "gloo/transport/ibverbs/unbound_buffer.h" namespace gloo { namespace transport { @@ -23,16 +24,26 @@ Context::~Context() {} std::unique_ptr& Context::createPair(int rank) { pairs_[rank] = std::unique_ptr( - new ibverbs::Pair(device_, getTimeout())); + new ibverbs::Pair(rank, device_, getTimeout())); return pairs_[rank]; } std::unique_ptr Context::createUnboundBuffer( void* ptr, size_t size) { - GLOO_THROW_INVALID_OPERATION_EXCEPTION( - "Unbound buffers not supported yet for ibverbs transport"); - return std::unique_ptr(); + return std::make_unique(this->shared_from_this(), ptr, size); +} + +void Context::signalException(const std::string& msg) { + // The `pairs_` vector is logically constant. After the context and + // all of its pairs have been created it is not mutated until the + // context is destructed. Therefore, we don't need to acquire this + // context's instance lock before looping over `pairs_`. + for (auto& pair : pairs_) { + if (pair) { + reinterpret_cast(pair.get())->signalIoFailure(msg); + } + } } } // namespace ibverbs diff --git a/gloo/transport/ibverbs/context.h b/gloo/transport/ibverbs/context.h index e573026f6..50757cfbe 100644 --- a/gloo/transport/ibverbs/context.h +++ b/gloo/transport/ibverbs/context.h @@ -33,10 +33,16 @@ class Context : public ::gloo::transport::Context, void* ptr, size_t size) override; + // Set exception on every pair in this context. This is called when + // waiting for a send or recv operation on an unbound buffer times + // out. All pairs should be signaled and closed in that event. + void signalException(const std::string& msg); + protected: std::shared_ptr device_; friend class Pair; + friend class UnboundBuffer; }; } // namespace ibverbs diff --git a/gloo/transport/ibverbs/device.cc b/gloo/transport/ibverbs/device.cc index 9fcb9d884..9c18a7b82 100644 --- a/gloo/transport/ibverbs/device.cc +++ b/gloo/transport/ibverbs/device.cc @@ -75,16 +75,29 @@ std::shared_ptr<::gloo::transport::Device> CreateDevice( } std::vector names; for (auto i = 0; i < devices.size(); i++) { + GLOO_DEBUG( + "found candidate device ", + devices[i]->name, + " dev=", + devices[i]->dev_name); names.push_back(devices[i]->name); } std::sort(names.begin(), names.end()); attr.name = names[0]; } + GLOO_INFO( + "Using ibverbs device=", + attr.name, + " port=", + attr.port, + " index=", + attr.index); + // Look for specified device name ibv_context* context = nullptr; for (int i = 0; i < devices.size(); i++) { - if (attr.name == devices[i]->name) { + if (attr.name == devices[i]->name || attr.name == devices[i]->dev_name) { context = ibv_open_device(devices[i]); break; } @@ -132,13 +145,13 @@ Device::~Device() { loop_->join(); rv = ibv_destroy_comp_channel(comp_channel_); - GLOO_ENFORCE_EQ(rv, 0); + GLOO_ENFORCE_EQ(rv, 0, strerror(errno)); rv = ibv_dealloc_pd(pd_); - GLOO_ENFORCE_EQ(rv, 0); + GLOO_ENFORCE_EQ(rv, 0, strerror(errno)); rv = ibv_close_device(context_); - GLOO_ENFORCE_EQ(rv, 0); + GLOO_ENFORCE_EQ(rv, 0, strerror(errno)); } std::string Device::str() const { @@ -199,10 +212,15 @@ void Device::loop() { rv = ibv_get_cq_event(comp_channel_, &cq, &cqContext); GLOO_ENFORCE_EQ(rv, 0, "ibv_get_cq_event"); - // Completion queue context is a Pair*. - // Delegate handling of this event to the pair itself. - Pair* pair = static_cast(cqContext); - pair->handleCompletionEvent(); + try { + // Completion queue context is a Pair*. + // Delegate handling of this event to the pair itself. + Pair* pair = static_cast(cqContext); + pair->handleCompletionEvent(); + } catch (const std::exception& ex) { + GLOO_ERROR("Exception while handling completion event: ", ex.what()); + throw; + } } } } // namespace ibverbs diff --git a/gloo/transport/ibverbs/device.h b/gloo/transport/ibverbs/device.h index 4eb480cbf..d664fe8c5 100644 --- a/gloo/transport/ibverbs/device.h +++ b/gloo/transport/ibverbs/device.h @@ -76,6 +76,7 @@ class Device : public ::gloo::transport::Device, friend class Pair; friend class Buffer; + friend class UnboundBuffer; }; } // namespace ibverbs diff --git a/gloo/transport/ibverbs/pair.cc b/gloo/transport/ibverbs/pair.cc index ae12cfd72..7812806c6 100644 --- a/gloo/transport/ibverbs/pair.cc +++ b/gloo/transport/ibverbs/pair.cc @@ -8,6 +8,7 @@ #include "gloo/transport/ibverbs/pair.h" #include "gloo/transport/ibverbs/buffer.h" +#include "gloo/transport/ibverbs/unbound_buffer.h" #include #include @@ -21,9 +22,11 @@ namespace transport { namespace ibverbs { Pair::Pair( + int rank, const std::shared_ptr& dev, std::chrono::milliseconds timeout) - : dev_(dev), + : rank_(rank), + dev_(dev), sync_(false), busyPoll_(false), timeout_(timeout), @@ -34,13 +37,15 @@ Pair::Pair( // Create completion queue { + GLOO_ENFORCE(dev_->context_, "context required"); + GLOO_ENFORCE(dev_->comp_channel_, "comp_channel required"); // Have to register this completion queue with the device's // completion channel to support asynchronous completion handling. // Pairs use asynchronous completion handling by default so // we call ibv_req_notify_cq(3) to request the first notification. cq_ = ibv_create_cq( dev_->context_, kCompletionQueueCapacity, this, dev_->comp_channel_, 0); - GLOO_ENFORCE(cq_); + GLOO_ENFORCE(cq_, "ibv_create_cq failed: ", strerror(errno)); // Arm notification mechanism for completion queue. rv = ibv_req_notify_cq(cq_, kNotifyOnAnyCompletion); @@ -111,6 +116,8 @@ Pair::Pair( } Pair::~Pair() { + std::lock_guard lock(m_); + int rv; // Acknowledge number of completion events handled by this @@ -227,6 +234,9 @@ void Pair::sendMemoryRegion(struct ibv_mr* src, int slot) { wr.send_flags = IBV_SEND_SIGNALED; wr.imm_data = slot; + GLOO_DEBUG( + "sendMemoryRegion slot=", slot, " addr=", src->addr, " lkey=", src->lkey); + // The work request is serialized and sent to the driver so it // doesn't need to be valid after the ibv_post_send call. struct ibv_send_wr* bad_wr = nullptr; @@ -238,12 +248,13 @@ void Pair::sendMemoryRegion(struct ibv_mr* src, int slot) { // Keep memory region around until this send operation completes. // They are posted in FIFO order, but may complete in arbitrary order. // Therefore we store them in a map keyed on the buffer slot. - GLOO_ENFORCE_EQ(mappedSendRegions_.count(slot), 0); - mappedSendRegions_[slot] = std::move(mr); + mappedSendRegions_[slot].emplace_back(std::move(mr)); } -const struct ibv_mr* Pair::getMemoryRegion(int slot) { - std::unique_lock lock(m_); +void Pair::recvMemoryRegion( + std::unique_lock& lock, + int slot, + std::function callback) { if (sync_) { auto it = peerMemoryRegions_.find(slot); auto start = std::chrono::steady_clock::now(); @@ -260,25 +271,16 @@ const struct ibv_mr* Pair::getMemoryRegion(int slot) { } it = peerMemoryRegions_.find(slot); } - return &it->second; + callback(it->second.front()); + it->second.pop_front(); } else { - auto pred = [&] { - return peerMemoryRegions_.find(slot) != peerMemoryRegions_.end(); - }; - if (timeout_ == kNoTimeout) { - // No timeout set. Wait for read to complete. - cv_.wait(lock, pred); + auto& q = peerMemoryRegions_[slot]; + if (q.empty()) { + recvMemoryRegionCallbacks_[slot].emplace_back(callback); } else { - auto done = cv_.wait_for(lock, timeout_, pred); - if (!done) { - signalIoFailure(GLOO_ERROR_MSG( - "Timeout waiting for memory region from ", peer_.str())); - GLOO_ENFORCE(false, "Unexpected code path"); - } + callback(q.front()); + q.pop_front(); } - auto it = peerMemoryRegions_.find(slot); - GLOO_ENFORCE(it != peerMemoryRegions_.end()); - return &it->second; } } @@ -304,7 +306,10 @@ Pair::createSendBuffer(int slot, void* ptr, size_t size) { std::unique_lock lock(m_); GLOO_ENFORCE_EQ(sendCompletionHandlers_.count(slot), 0); auto buffer = new Buffer(this, slot, ptr, size); - sendCompletionHandlers_[slot] = buffer; + auto& q = sendCompletionHandlers_[slot]; + q.clear(); + q.emplace_back(buffer); + return std::unique_ptr<::gloo::transport::Buffer>(buffer); } @@ -313,7 +318,9 @@ Pair::createRecvBuffer(int slot, void* ptr, size_t size) { std::unique_lock lock(m_); GLOO_ENFORCE_EQ(recvCompletionHandlers_.count(slot), 0); auto buffer = new Buffer(this, slot, ptr, size); - recvCompletionHandlers_[slot] = buffer; + auto& q = recvCompletionHandlers_[slot]; + q.clear(); + q.emplace_back(buffer); sendMemoryRegion(buffer->mr_, buffer->slot_); return std::unique_ptr<::gloo::transport::Buffer>(buffer); } @@ -321,21 +328,83 @@ Pair::createRecvBuffer(int slot, void* ptr, size_t size) { // Send from the specified buffer to remote side of pair. void Pair::send( transport::UnboundBuffer* tbuf, - uint64_t /* unused */, - size_t /* unused */, - size_t /* unused */) { - GLOO_THROW_INVALID_OPERATION_EXCEPTION( - "Unbound buffers not supported yet for ibverbs transport"); + uint64_t slot, + size_t offset, + size_t nbytes) { + std::unique_lock lock(m_); + GLOO_DEBUG("send tag=", slot, " offset=", offset, " nbytes=", nbytes); + + GLOO_ENFORCE(!sync_, "Cannot send in sync mode"); + + auto* buf = dynamic_cast(tbuf); + GLOO_ENFORCE_NE(buf, nullptr); + + sendCompletionHandlers_[slot].emplace_back(buf); + + auto mr = buf->mr_; + + recvMemoryRegion( + lock, + slot, + [this, mr, slot, ptr = buf->ptr, offset, nbytes]( + struct ibv_mr peer) mutable { + struct ibv_sge list; + list.addr = (uint64_t)ptr + offset; + list.length = nbytes; + list.lkey = mr->lkey; + + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = slot; + wr.sg_list = &list; + wr.num_sge = 1; + wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr.send_flags = IBV_SEND_SIGNALED; + wr.imm_data = slot; + + wr.wr.rdma.remote_addr = (uint64_t)peer.addr; + wr.wr.rdma.rkey = peer.rkey; + + GLOO_DEBUG( + "send UnboundBuffer async slot=", + wr.wr_id, + " peer_addr=", + peer.addr, + " remote_addr=", + std::hex, + wr.wr.rdma.remote_addr, + std::dec, + " rkey=", + wr.wr.rdma.rkey); + + struct ibv_send_wr* bad_wr; + auto rv = ibv_post_send(qp_, &wr, &bad_wr); + if (rv != 0) { + signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv)); + } + }); } // Receive into the specified buffer from the remote side of pair. void Pair::recv( transport::UnboundBuffer* tbuf, - uint64_t /* unused */, - size_t /* unused */, - size_t /* unused */) { - GLOO_THROW_INVALID_OPERATION_EXCEPTION( - "Unbound buffers not supported yet for ibverbs transport"); + uint64_t tag, + size_t offset, + size_t nbytes) { + std::unique_lock lock(m_); + + GLOO_DEBUG("recv tag=", tag, " offset=", offset, " nbytes=", nbytes); + + GLOO_ENFORCE(!sync_, "Cannot recv in sync mode"); + + auto* buf = dynamic_cast(tbuf); + GLOO_ENFORCE_NE(buf, nullptr); + + recvCompletionHandlers_[tag].emplace_back(buf); + + struct ibv_mr mr = *buf->mr_; + mr.addr = (void*)((uintptr_t)(mr.addr) + offset); + sendMemoryRegion(&mr, tag); } // place holder for future use @@ -390,7 +459,12 @@ void Pair::pollCompletions() { // Handle work completions for (int i = 0; i < nwc; i++) { checkErrorState(); - handleCompletion(&wc[i]); + try { + handleCompletion(&wc[i]); + } catch (const std::exception& ex) { + GLOO_ERROR("Exception in handleCompletion: ", ex.what()); + throw; + } } // Break unless wc was filled @@ -401,6 +475,15 @@ void Pair::pollCompletions() { } void Pair::handleCompletion(struct ibv_wc* wc) { + GLOO_DEBUG( + self_.str(), + "->", + peer_.str(), + ": handleCompletion id=", + wc->wr_id, + " opcode=", + wc->opcode); + if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) { // Incoming RDMA write completed. // Slot is encoded in immediate data on receive work completion. @@ -414,8 +497,11 @@ void Pair::handleCompletion(struct ibv_wc* wc) { ": ", ibv_wc_status_str(wc->status)); - GLOO_ENFORCE(recvCompletionHandlers_[slot] != nullptr); - recvCompletionHandlers_[slot]->handleCompletion(wc); + auto& q = recvCompletionHandlers_[slot]; + q.front()->handleCompletion(rank_, wc); + if (!q.front()->isPeristentHandler()) { + q.pop_front(); + } // Backfill receive work requests. postReceive(); @@ -433,9 +519,20 @@ void Pair::handleCompletion(struct ibv_wc* wc) { ": ", ibv_wc_status_str(wc->status)); - GLOO_ENFORCE(sendCompletionHandlers_[slot] != nullptr); - sendCompletionHandlers_[slot]->handleCompletion(wc); + auto& q = sendCompletionHandlers_[slot]; + q.front()->handleCompletion(rank_, wc); + if (!q.front()->isPeristentHandler()) { + q.pop_front(); + } } else if (wc->opcode == IBV_WC_RECV) { + GLOO_DEBUG( + self_.str(), + "->", + peer_.str(), + ": handleCompletion id=", + wc->wr_id, + " opcode=IBV_WC_RECV slot=", + wc->imm_data); // Memory region recv completed. // // Only used by the remote side of the pair to pass ibv_mr's. @@ -460,7 +557,14 @@ void Pair::handleCompletion(struct ibv_wc* wc) { // Move ibv_mr from memory region 'inbox' to final slot. const auto& mr = mappedRecvRegions_[recvPosted_ % kMaxBuffers]; - peerMemoryRegions_[slot] = mr->mr(); + + auto& q = recvMemoryRegionCallbacks_[slot]; + if (q.empty()) { + peerMemoryRegions_[slot].emplace_back(mr->mr()); + } else { + q.front()(mr->mr()); + q.pop_front(); + } // Notify any buffer waiting for the details of its remote peer. cv_.notify_all(); @@ -469,6 +573,15 @@ void Pair::handleCompletion(struct ibv_wc* wc) { postReceive(); } else if (wc->opcode == IBV_WC_SEND) { // Memory region send completed. + + GLOO_DEBUG( + self_.str(), + "->", + peer_.str(), + ": handleCompletion id=", + wc->wr_id, + " opcode=IBV_WC_SEND"); + auto slot = wc->wr_id; GLOO_ENFORCE_EQ( wc->status, @@ -478,43 +591,67 @@ void Pair::handleCompletion(struct ibv_wc* wc) { ": ", ibv_wc_status_str(wc->status)); - GLOO_ENFORCE_GT(mappedSendRegions_.size(), 0); - GLOO_ENFORCE_EQ(mappedSendRegions_.count(slot), 1); - mappedSendRegions_.erase(slot); + mappedSendRegions_[slot].pop_front(); } else { GLOO_ENFORCE(false, "Unexpected completion with opcode: ", wc->opcode); } } void Pair::send(Buffer* buffer, size_t offset, size_t length, size_t roffset) { - struct ibv_sge list; - list.addr = (uint64_t)buffer->ptr_ + offset; - list.length = length; - list.lkey = buffer->mr_->lkey; - - struct ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = buffer->slot_; - wr.sg_list = &list; - wr.num_sge = 1; - wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; - wr.send_flags = IBV_SEND_SIGNALED; - wr.imm_data = buffer->slot_; - - const struct ibv_mr* peer = getMemoryRegion(buffer->slot_); - GLOO_ENFORCE_NE(peer, (const struct ibv_mr*)nullptr); - wr.wr.rdma.remote_addr = (uint64_t)peer->addr + roffset; - wr.wr.rdma.rkey = peer->rkey; + std::unique_lock lock(m_); - struct ibv_send_wr* bad_wr; - auto rv = ibv_post_send(qp_, &wr, &bad_wr); - if (rv != 0) { - signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv)); + auto send = + [this, buffer, offset, length, roffset](struct ibv_mr peer) mutable { + struct ibv_sge list; + list.addr = (uint64_t)buffer->ptr_ + offset; + list.length = length; + list.lkey = buffer->mr_->lkey; + + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = buffer->slot_; + wr.sg_list = &list; + wr.num_sge = 1; + wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr.send_flags = IBV_SEND_SIGNALED; + wr.imm_data = buffer->slot_; + + wr.wr.rdma.remote_addr = (uint64_t)peer.addr + roffset; + wr.wr.rdma.rkey = peer.rkey; + + GLOO_DEBUG( + "send Buffer async slot=", + wr.wr_id, + " peer_addr=", + peer.addr, + " remote_addr=", + std::hex, + wr.wr.rdma.remote_addr, + std::dec, + " rkey=", + wr.wr.rdma.rkey); + + struct ibv_send_wr* bad_wr; + auto rv = ibv_post_send(qp_, &wr, &bad_wr); + if (rv != 0) { + signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv)); + } + }; + + if (buffer->peerMr_ == nullptr) { + recvMemoryRegion( + lock, buffer->slot_, [send, buffer](struct ibv_mr peer) mutable { + buffer->peerMr_ = std::make_unique(peer); + send(peer); + }); + } else { + send(*buffer->peerMr_); } } void Pair::signalIoFailure(const std::string& msg) { std::lock_guard lock(m_); + GLOO_ERROR(msg); auto ex = ::gloo::IoException(msg); if (ex_ == nullptr) { // If we haven't seen an error yet, store the exception to throw on future @@ -523,12 +660,18 @@ void Pair::signalIoFailure(const std::string& msg) { // Loop through the completion handlers and signal that an error has // occurred. for (auto& it : recvCompletionHandlers_) { - GLOO_ENFORCE(it.second != nullptr); - it.second->signalError(ex_); + auto& q = it.second; + while (!q.empty()) { + q.front()->signalError(ex_); + q.pop_front(); + } } for (auto& it : sendCompletionHandlers_) { - GLOO_ENFORCE(it.second != nullptr); - it.second->signalError(ex_); + auto& q = it.second; + while (!q.empty()) { + q.front()->signalError(ex_); + q.pop_front(); + } } } // Finally, throw the exception on this thread. diff --git a/gloo/transport/ibverbs/pair.h b/gloo/transport/ibverbs/pair.h index 3e4874add..8b9dbdbba 100644 --- a/gloo/transport/ibverbs/pair.h +++ b/gloo/transport/ibverbs/pair.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,18 @@ namespace ibverbs { // Forward declaration class Buffer; +class BufferHandler { + public: + virtual ~BufferHandler() = default; + + virtual void handleCompletion(int rank, struct ibv_wc* wc) = 0; + virtual void signalError(const std::exception_ptr& ex) = 0; + + virtual bool isPeristentHandler() { + return false; + } +}; + class Pair : public ::gloo::transport::Pair { static constexpr int kMaxBuffers = 8; static constexpr auto kRecvCompletionQueueCapacity = kMaxBuffers; @@ -46,6 +59,7 @@ class Pair : public ::gloo::transport::Pair { public: explicit Pair( + int rank, const std::shared_ptr& dev, std::chrono::milliseconds timeout); @@ -93,7 +107,11 @@ class Pair : public ::gloo::transport::Pair { void close() override; + void signalIoFailure(const std::string& msg); + protected: + const int rank_; + std::shared_ptr dev_; // Whether or not this pair is running in sync mode. @@ -120,7 +138,9 @@ class Pair : public ::gloo::transport::Pair { std::condition_variable cv_; // For us to copy the remote peer's ibv_mr into. - std::map peerMemoryRegions_; + std::map> peerMemoryRegions_; + std::map>> + recvMemoryRegionCallbacks_; // These fields store memory regions that the remote side of the pair // can send to and that the local side of the pair can send from. @@ -135,7 +155,7 @@ class Pair : public ::gloo::transport::Pair { // mappedRecvRegions_. These regions are referenced round-robin for // every posted receive work request. // - std::map> mappedSendRegions_; + std::map>> mappedSendRegions_; std::array, kMaxBuffers> mappedRecvRegions_; // Keep track of number of request work requests posted and completed. @@ -144,11 +164,14 @@ class Pair : public ::gloo::transport::Pair { uint64_t recvPosted_; // Completions on behalf of buffers need to be forwarded to those buffers. - std::map sendCompletionHandlers_; - std::map recvCompletionHandlers_; + std::map> sendCompletionHandlers_; + std::map> recvCompletionHandlers_; void sendMemoryRegion(struct ibv_mr* mr, int slot); - const struct ibv_mr* getMemoryRegion(int slot); + void recvMemoryRegion( + std::unique_lock& lock, + int slot, + std::function callback); void postReceive(); @@ -165,10 +188,10 @@ class Pair : public ::gloo::transport::Pair { bool closed_ = false; // Used to signal IO exceptions from one thread and propagate onto others. - void signalIoFailure(const std::string& msg); void checkErrorState(); friend class Buffer; + friend class UnboundBuffer; }; } // namespace ibverbs diff --git a/gloo/transport/ibverbs/unbound_buffer.cc b/gloo/transport/ibverbs/unbound_buffer.cc new file mode 100644 index 000000000..94b774ae1 --- /dev/null +++ b/gloo/transport/ibverbs/unbound_buffer.cc @@ -0,0 +1,260 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "gloo/transport/ibverbs/unbound_buffer.h" + +#include + +#include "gloo/common/error.h" +#include "gloo/common/logging.h" +#include "gloo/transport/ibverbs/context.h" + +namespace gloo { +namespace transport { +namespace ibverbs { + +UnboundBuffer::UnboundBuffer( + const std::shared_ptr& context, + void* ptr, + size_t size) + : ::gloo::transport::UnboundBuffer(ptr, size), + context_(context), + recvRank_(-1), + sendCompletions_(0), + sendRank_(-1), + shareableNonOwningPtr_(this) { + std::unique_lock lock(m_); + auto dev = context->device_; + + auto mr = ibv_reg_mr( + dev->pd_, + // Empty buffers still need a valid pointer and positive length otherwise + // IB throws an error. We use our size 1 empty buffer for this. + size == 0 ? static_cast(&emptyBuf_) : ptr, + size == 0 ? sizeof(emptyBuf_) : size, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + + // Provide hint if the error is EFAULT and nv_peer_mem is not loaded + if (mr == nullptr && errno == EFAULT) { + if (!dev->hasNvPeerMem_) { + GLOO_ENFORCE( + mr != nullptr, + "ibv_reg_mr: ", + strerror(errno), + " (kernel module 'nv_peer_mem' not loaded;" + " did you specify a pointer to GPU memory?)"); + } + } + + // Provide hint if the error is ENOMEM + if (mr == nullptr && errno == ENOMEM) { + GLOO_ENFORCE( + mr != nullptr, + "ibv_reg_mr: ", + strerror(errno), + " (did you run into the locked memory limit?)"); + } + + GLOO_ENFORCE( + mr != nullptr, + "ibv_reg_mr: ", + strerror(errno), + " ptr=", + ptr, + " size=", + size); + + mr_ = mr; +} + +UnboundBuffer::~UnboundBuffer() { + std::lock_guard guard(m_); + + ibv_dereg_mr(mr_); +} + +void UnboundBuffer::abortWaitRecv() { + std::lock_guard guard(m_); + abortWaitRecv_ = true; + recvCv_.notify_one(); +} + +void UnboundBuffer::abortWaitSend() { + std::lock_guard guard(m_); + abortWaitSend_ = true; + sendCv_.notify_one(); +} + +void UnboundBuffer::handleCompletion(int rank, struct ibv_wc* wc) { + if (wc->opcode & IBV_WC_RECV) { + std::unique_lock lock(m_); + recvCompletions_.emplace_back(rank); + recvCv_.notify_one(); + } else if (wc->opcode == IBV_WC_RDMA_WRITE) { + std::unique_lock lock(m_); + sendCompletions_++; + sendPending_--; + sendCv_.notify_one(); + + GLOO_DEBUG( + "send complete sendPending=", + sendPending_, + " sendCompletions=", + sendCompletions_); + } else { + GLOO_ENFORCE(false, "Unexpected completion (opcode: ", wc->opcode, ")"); + } +} + +bool UnboundBuffer::waitRecv(int* rank, std::chrono::milliseconds timeout) { + // The device thread will signal completion. If the completion + // hasn't arrived yet, wait until it does. + auto pred = [&] { + throwIfException(); + return abortWaitRecv_ || recvCompletions_.size() > 0; + }; + std::unique_lock lock(m_); + if (timeout == kNoTimeout || timeout == kUnsetTimeout) { + // No timeout set. Wait for read to complete. + recvCv_.wait(lock, pred); + } else { + auto done = recvCv_.wait_for(lock, timeout, pred); + if (!done) { + // Release the mutex before calling into the pair to avoid deadlock. + // Calling signalIoFailure() will throw, so no need to + // reacquire. + lock.unlock(); + + auto msg = GLOO_ERROR_MSG( + "Timed out waiting ", + timeout.count(), + "ms for recv operation to complete"); + context_->signalException(msg); + throw ::gloo::IoException(msg); + } + } + + if (abortWaitRecv_) { + // Reset to false, so that only this waitRecv is interrupted + abortWaitRecv_ = false; + return false; + } + + if (rank != nullptr) { + *rank = recvCompletions_.front(); + } + recvCompletions_.pop_front(); + + return true; +} + +bool UnboundBuffer::waitSend(int* rank, std::chrono::milliseconds timeout) { + // The device thread will signal completion. If the completion + // hasn't arrived yet, wait until it does. + std::unique_lock lock(m_); + throwIfException(); + if (sendCompletions_ == 0) { + GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for"); + auto pred = [&] { + throwIfException(); + return abortWaitSend_ || sendCompletions_ > 0; + }; + if (timeout == kNoTimeout || timeout == kUnsetTimeout) { + // No timeout set. Wait for read to complete. + sendCv_.wait(lock, pred); + } else { + auto done = sendCv_.wait_for(lock, timeout, pred); + if (!done) { + // Release the mutex before calling into the pair to avoid deadlock. + // Calling signalIoFailure() will throw, so no need to + // reacquire. + lock.unlock(); + auto msg = GLOO_ERROR_MSG( + "Timed out waiting ", + timeout.count(), + "ms for send operation to complete"); + context_->signalException(msg); + throw ::gloo::IoException(msg); + } + } + } + + if (abortWaitSend_) { + // Reset to false, so that only this waitSend is interrupted + abortWaitSend_ = false; + return false; + } + sendCompletions_--; + return true; +} + +void UnboundBuffer::send( + int dstRank, + uint64_t slot, + size_t offset, + size_t nbytes) { + // Default the number of bytes to be equal to the number + // of bytes remaining in the buffer w.r.t. the offset. + if (nbytes == kUnspecifiedByteCount) { + GLOO_ENFORCE_LE(offset, this->size); + nbytes = this->size - offset; + } + sendPending_++; + context_->getPair(dstRank)->send(this, slot, offset, nbytes); +} + +void UnboundBuffer::recv( + int srcRank, + uint64_t slot, + size_t offset, + size_t nbytes) { + // Default the number of bytes to be equal to the number + // of bytes remaining in the buffer w.r.t. the offset. + if (nbytes == kUnspecifiedByteCount) { + GLOO_ENFORCE_LE(offset, this->size); + nbytes = this->size - offset; + } + context_->getPair(srcRank)->recv(this, slot, offset, nbytes); +} + +void UnboundBuffer::recv( + std::vector srcRanks, + uint64_t slot, + size_t offset, + size_t nbytes) { + // Default the number of bytes to be equal to the number + // of bytes remaining in the buffer w.r.t. the offset. + if (nbytes == kUnspecifiedByteCount) { + GLOO_ENFORCE_LT(offset, this->size); + nbytes = this->size - offset; + } + + GLOO_ENFORCE_EQ(srcRanks.size(), 1, "TODO: Only one src rank is supported"); + + for (auto rank : srcRanks) { + context_->getPair(rank)->recv(this, slot, offset, nbytes); + } +} + +void UnboundBuffer::signalError(const std::exception_ptr& ex) { + std::lock_guard lock(m_); + ex_ = ex; + recvCv_.notify_all(); + sendCv_.notify_all(); +} + +void UnboundBuffer::throwIfException() { + if (ex_ != nullptr) { + std::rethrow_exception(ex_); + } +} + +} // namespace ibverbs +} // namespace transport +} // namespace gloo diff --git a/gloo/transport/ibverbs/unbound_buffer.h b/gloo/transport/ibverbs/unbound_buffer.h new file mode 100644 index 000000000..e852b6cb3 --- /dev/null +++ b/gloo/transport/ibverbs/unbound_buffer.h @@ -0,0 +1,107 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "gloo/common/memory.h" +#include "gloo/transport/ibverbs/device.h" +#include "gloo/transport/ibverbs/pair.h" +#include "gloo/transport/unbound_buffer.h" + +#include +#include +#include +#include + +namespace gloo { +namespace transport { +namespace ibverbs { + +// Forward declaration +class Context; +class Pair; + +class UnboundBuffer : public ::gloo::transport::UnboundBuffer, + public BufferHandler { + public: + UnboundBuffer( + const std::shared_ptr& context, + void* ptr, + size_t size); + + virtual ~UnboundBuffer(); + + // If specified, the source of this recv is stored in the rank pointer. + // Returns true if it completed, false if it was aborted. + bool waitRecv(int* rank, std::chrono::milliseconds timeout) override; + + // If specified, the destination of this send is stored in the rank pointer. + // Returns true if it completed, false if it was aborted. + bool waitSend(int* rank, std::chrono::milliseconds timeout) override; + + // Aborts a pending waitRecv call. + void abortWaitRecv() override; + + // Aborts a pending waitSend call. + void abortWaitSend() override; + + void send(int dstRank, uint64_t slot, size_t offset, size_t nbytes) override; + + void recv(int srcRank, uint64_t slot, size_t offset, size_t nbytes) override; + + void recv( + std::vector srcRanks, + uint64_t slot, + size_t offset, + size_t nbytes) override; + + void handleCompletion(int rank, struct ibv_wc* wc); + // Set exception and wake up any waitRecv/waitSend threads. + void signalError(const std::exception_ptr&) override; + + protected: + std::shared_ptr context_; + + // Empty buffer to use when a nullptr buffer is created. + char emptyBuf_[1]; + + std::mutex m_; + std::condition_variable recvCv_; + std::condition_variable sendCv_; + bool abortWaitRecv_{false}; + bool abortWaitSend_{false}; + + struct ibv_mr* mr_; + + std::deque recvCompletions_; + int recvRank_; + int sendCompletions_; + int sendRank_; + int sendPending_{0}; + + std::exception_ptr ex_; + + // Throws if an exception if set. + void throwIfException(); + + // Allows for sharing weak (non owning) references to "this" without + // affecting the lifetime of this instance. + ShareableNonOwningPtr shareableNonOwningPtr_; + + // Returns weak reference to "this". See pair.{h,cc} for usage. + inline WeakNonOwningPtr getWeakNonOwningPtr() const { + return WeakNonOwningPtr(shareableNonOwningPtr_); + } + + friend class Context; + friend class Pair; +}; + +} // namespace ibverbs +} // namespace transport +} // namespace gloo