From 1ff67a5f65916a2f01a6be79cd4fadeddfc45c9e Mon Sep 17 00:00:00 2001 From: xilunwu Date: Mon, 10 Feb 2025 15:16:57 -0800 Subject: [PATCH 1/2] allow seq_number == global_rank for TCP backend Summary: All credit goes to original author XilunWu. I am just landing the code to unblock large Ads jobs. D45740631 reduces gloo rendezvous cost for TCP backend by eliminating duplicate address publishing to TCPStore. Ben suggested "have seq_number == global_rank" to further get rid of `seq_number` exchange and Shawn reported why this didn't work. This diff serves a starting point for benchmarking the benefit of doing so ([testbed record](https://docs.google.com/document/d/1_p390fx0IiaZWbt-Dkdvp9jSgCKebiuG8_a6BVjHtMU/edit) shows 2x speedup: 46 min ProcessGroupGloo init time on 8k ranks -> 20 min). The feature will be enabled via env variable (GLOO_ENABLE_RANK_AS_SEQUENCE_NUMBER) disabled by default that will be controlled by justKnobs. Differential Revision: D48130088 --- gloo/transport/tcp/listener.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/gloo/transport/tcp/listener.cc b/gloo/transport/tcp/listener.cc index 038f376ea..5e514c0e6 100644 --- a/gloo/transport/tcp/listener.cc +++ b/gloo/transport/tcp/listener.cc @@ -15,6 +15,7 @@ #include #include #include +# namespace gloo { namespace transport { From 06f24f534978b9d03edb2e80a1986c0723938e4a Mon Sep 17 00:00:00 2001 From: Boris Sarana Date: Tue, 11 Feb 2025 14:51:00 -0800 Subject: [PATCH 2/2] Use multi_get for store that has extended API support. (#408) Summary: Pull Request resolved: https://github.com/facebookincubator/gloo/pull/408 The TCP store has API v2 support we can reduce the network overhead of Gloo rendezvous significantly by fetching a batch of key instead of doing them one by one. Initial testing shows ~15X improvement for 4k jobs. Gloo process group init: Baseline ( fbcode trunk): 2k job (https://fburl.com/mlhub/x1prxu89) : ~82sec (~1.4 min) 4k job (https://fburl.com/mlhub/v1djk4n5) : ~393 sec (~6.6min) 8k job (https://fburl.com/mlhub/cagqrs7m): (~55mins) With optimizations (D48130088 + D52083376): 2k job (https://fburl.com/mlhub/x0cskdag) : ~18 sec ( ~5x faster) 4k job (https://fburl.com/mlhub/xzmvkm4j) : ~ 25 sec (~15x faster) 8k job (https://fburl.com/mlhub/gdyeizv9) : ~ 85 sec (~35x faster) Reviewed By: xunnanxu Differential Revision: D52083376 --- gloo/common/store.h | 15 ++++++++++++ gloo/common/utils.cc | 6 +++++ gloo/common/utils.h | 2 ++ gloo/transport/tcp/context.cc | 46 ++++++++++++++++++++++++++--------- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/gloo/common/store.h b/gloo/common/store.h index 010d81587..cff42572c 100644 --- a/gloo/common/store.h +++ b/gloo/common/store.h @@ -25,6 +25,21 @@ class IStore { virtual void wait( const std::vector& keys, const std::chrono::milliseconds& timeout) = 0; + + // Extended 2.0 API support + virtual bool has_v2_support() = 0; + + virtual std::vector> multi_get( + const std::vector& keys) = 0; + + virtual void multi_set( + const std::vector& keys, + const std::vector>& values) = 0; + + virtual void append( + const std::string& key, + const std::vector& value) = 0; + virtual int64_t add(const std::string& key, int64_t value) = 0; }; } // namespace gloo diff --git a/gloo/common/utils.cc b/gloo/common/utils.cc index 7b3172d0b..d543d2a0f 100644 --- a/gloo/common/utils.cc +++ b/gloo/common/utils.cc @@ -36,4 +36,10 @@ bool useRankAsSeqNumber() { (std::string(res) == "True" || std::string(res) == "1"); } +bool isStoreExtendedApiEnabled() { + const auto& res = std::getenv("GLOO_ENABLE_STORE_V2_API"); + return res != nullptr && + (std::string(res) == "True" || std::string(res) == "1"); +} + } // namespace gloo diff --git a/gloo/common/utils.h b/gloo/common/utils.h index 343c3fab9..185ebaf19 100644 --- a/gloo/common/utils.h +++ b/gloo/common/utils.h @@ -16,4 +16,6 @@ std::string getHostname(); bool useRankAsSeqNumber(); +bool isStoreExtendedApiEnabled(); + } // namespace gloo diff --git a/gloo/transport/tcp/context.cc b/gloo/transport/tcp/context.cc index 20cf97e29..5140fd2fe 100644 --- a/gloo/transport/tcp/context.cc +++ b/gloo/transport/tcp/context.cc @@ -8,10 +8,12 @@ #include "gloo/transport/tcp/context.h" +#include +#include #include #include +#include -#include "gloo/common/error.h" #include "gloo/common/logging.h" #include "gloo/common/utils.h" #include "gloo/transport/tcp/device.h" @@ -22,6 +24,8 @@ namespace gloo { namespace transport { namespace tcp { +constexpr int kDefaultBatchSize = 128; + Context::Context(std::shared_ptr device, int rank, int size) : ::gloo::transport::Context(rank, size), device_(std::move(device)) {} @@ -78,12 +82,36 @@ void Context::createAndConnectAllPairs(IStore& store) { // which does not have the rank info hosted at a higher `Pair` level). // So better safe than sorry for now we try to minimize the changeset needed. const auto& currentRankPair = getPair(rank); - auto deviceAddress = Address( + const auto& deviceAddress = Address( static_cast(currentRankPair.get())->address().getSockaddr()); Rank currentRankInfo( localHostName, deviceAddress.bytes(), std::move(pairIdentifiers)); store.set(std::to_string(rank), currentRankInfo.bytes()); + std::vector> remoteRankInfos; + int key = 0; + if (isStoreExtendedApiEnabled() && store.has_v2_support()) { + auto sizeRemaining = size; + while (sizeRemaining > 0) { + const auto batchKeys = std::min(kDefaultBatchSize, sizeRemaining); + std::vector keys(batchKeys); + std::generate_n( + keys.begin(), batchKeys, [&] { return std::to_string(key++); }); + const auto& batchRemoteInfos = store.multi_get(keys); + remoteRankInfos.insert( + remoteRankInfos.end(), + batchRemoteInfos.begin(), + batchRemoteInfos.end()); + sizeRemaining -= batchKeys; + } + } else { + std::generate_n(std::back_inserter(remoteRankInfos), size, [&] { + const auto& keyStr = std::to_string(key++); + store.wait({keyStr.c_str()}, getTimeout()); + return store.get(keyStr); + }); + } + // Connect every pair for (int i = 0; i < size; i++) { if (i == rank) { @@ -95,16 +123,9 @@ void Context::createAndConnectAllPairs(IStore& store) { continue; } - // Wait for address of other side of this pair to become available - std::ostringstream key; - key << i; - store.wait({key.str()}, getTimeout()); + Rank remoteRankInfo(remoteRankInfos[i]); - // Connect to other side of this pair - std::vector rankInfoBytes = store.get(key.str()); - Rank remoteRankInfo(rankInfoBytes); - const auto& remoteHostname = remoteRankInfo.hostname; - if (!localRankSet && remoteHostname == localHostName) { + if (!localRankSet && remoteRankInfo.hostname == localHostName) { ++localRank; } @@ -112,7 +133,8 @@ void Context::createAndConnectAllPairs(IStore& store) { auto remoteDeviceAddr = Address(remoteRankInfo.addressBytes).getSockaddr(); auto remoteAddr = Address( remoteDeviceAddr, - useRankAsSeqNum ? (ssize_t)rank : remoteRankInfo.pairIdentifiers[rank]); + useRankAsSeqNum ? (sequence_number_t)rank + : remoteRankInfo.pairIdentifiers[rank]); pair->connect(remoteAddr.bytes()); }