Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gloo/test/tcp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TEST(TcpTest, ConnectTimeout) {
EXPECT_TRUE(e);
EXPECT_TRUE(dynamic_cast<const TimeoutError*>(&e));
};
connectLoop(loop, remote, timeout, std::move(fn));
connectLoop(loop, remote, 0, 5, timeout, std::move(fn));

std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&] { return done; });
Expand Down
3 changes: 3 additions & 0 deletions gloo/transport/tcp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ else()
"${CMAKE_CURRENT_SOURCE_DIR}/address.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/buffer.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/context.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/debug_logger.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/device.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/error.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/helpers.cc"
Expand All @@ -19,6 +20,8 @@ else()
"${CMAKE_CURRENT_SOURCE_DIR}/attr.h"
"${CMAKE_CURRENT_SOURCE_DIR}/buffer.h"
"${CMAKE_CURRENT_SOURCE_DIR}/context.h"
"${CMAKE_CURRENT_SOURCE_DIR}/debug_data.h"
"${CMAKE_CURRENT_SOURCE_DIR}/debug_logger.h"
"${CMAKE_CURRENT_SOURCE_DIR}/device.h"
"${CMAKE_CURRENT_SOURCE_DIR}/error.h"
"${CMAKE_CURRENT_SOURCE_DIR}/helpers.h"
Expand Down
23 changes: 23 additions & 0 deletions gloo/transport/tcp/debug_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.

#include <string>
#pragma once

namespace gloo {
namespace transport {
namespace tcp {

struct ConnectDebugData {
const int retryCount;
const int retryLimit;
const bool willRetry;
const int glooRank;
const int glooSize;
const std::string error;
const std::string remote;
const std::string local;
};

} // namespace tcp
} // namespace transport
} // namespace gloo
30 changes: 30 additions & 0 deletions gloo/transport/tcp/debug_logger.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <gloo/common/logging.h>
#include <gloo/transport/tcp/debug_logger.h>

namespace gloo {
namespace transport {
namespace tcp {

void DebugLogger::log(const ConnectDebugData& data) {
GLOO_ERROR(
"failed to connect, willRetry=",
data.willRetry,
", retry=",
data.retryCount,
", retryLimit=",
data.retryLimit,
", rank=",
data.glooRank,
", size=",
data.glooSize,
", local=",
data.local,
", remote=",
data.remote,
", error=",
data.error);
}

} // namespace tcp
} // namespace transport
} // namespace gloo
20 changes: 20 additions & 0 deletions gloo/transport/tcp/debug_logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.

#pragma once

#include <gloo/transport/tcp/debug_data.h>

namespace gloo {
namespace transport {
namespace tcp {

class DebugLogger {
public:
static void log(const ConnectDebugData& data);

private:
};

} // namespace tcp
} // namespace transport
} // namespace gloo
8 changes: 7 additions & 1 deletion gloo/transport/tcp/device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,14 @@ bool Device::isInitiator(const Address& local, const Address& remote) const {
void Device::connect(
const Address& local,
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
connect_callback_t fn) {
auto initiator = isInitiator(local, remote);

if (initiator) {
connectAsInitiator(remote, timeout, std::move(fn));
connectAsInitiator(remote, rank, size, timeout, std::move(fn));
return;
}
connectAsListener(local, timeout, std::move(fn));
Expand Down Expand Up @@ -335,6 +337,8 @@ void Device::connectAsListener(
//
void Device::connectAsInitiator(
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
connect_callback_t fn) {
auto writeSeq = [loop = loop_, seq = remote.getSeq()](
Expand All @@ -357,6 +361,8 @@ void Device::connectAsInitiator(
connectLoop(
loop_,
remote,
rank,
size,
timeout,
[loop = loop_, fn = std::move(fn), writeSeq = std::move(writeSeq)](
std::shared_ptr<Socket> socket, const Error& error) {
Expand Down
4 changes: 4 additions & 0 deletions gloo/transport/tcp/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class Device : public ::gloo::transport::Device,
void connect(
const Address& local,
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
connect_callback_t fn);

Expand All @@ -106,6 +108,8 @@ class Device : public ::gloo::transport::Device,

void connectAsInitiator(
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
connect_callback_t fn);

Expand Down
4 changes: 3 additions & 1 deletion gloo/transport/tcp/helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ namespace tcp {
void connectLoop(
std::shared_ptr<Loop> loop,
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
typename ConnectOperation::callback_t fn) {
auto x = std::make_shared<ConnectOperation>(
std::move(loop), remote, timeout, std::move(fn));
std::move(loop), remote, rank, size, timeout, std::move(fn));
x->run();
}

Expand Down
27 changes: 20 additions & 7 deletions gloo/transport/tcp/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
#include <memory>

#include <gloo/common/logging.h>
#include <gloo/transport/tcp/debug_data.h>
#include <gloo/transport/tcp/error.h>
#include <gloo/transport/tcp/loop.h>
#include <gloo/transport/tcp/socket.h>
#include "gloo/transport/tcp/debug_logger.h" // @manual=//gloo:debug_logger

namespace gloo {
namespace transport {
Expand Down Expand Up @@ -180,9 +182,13 @@ class ConnectOperation final
ConnectOperation(
std::shared_ptr<Loop> loop,
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
callback_t fn)
: remote_(remote),
rank_(rank),
size_(size),
deadline_(std::chrono::steady_clock::now() + timeout),
loop_(std::move(loop)),
fn_(std::move(fn)) {}
Expand Down Expand Up @@ -230,15 +236,18 @@ class ConnectOperation final
SystemError e("SO_ERROR", result, remote_);
bool willRetry = std::chrono::steady_clock::now() < deadline_ &&
retry_++ < maxRetries_;
GLOO_ERROR(
"failed to connect, willRetry=",
willRetry,
", retry=",

auto debugData = ConnectDebugData{
retry_,
", remote=",
maxRetries_,
willRetry,
rank_,
size_,
e.what(),
remote_.str(),
", error=",
e.what());
socket_->sockName().str(),
};
DebugLogger::log(debugData);
// check deadline
if (willRetry) {
run();
Expand All @@ -253,6 +262,8 @@ class ConnectOperation final

private:
const Address remote_;
const int rank_;
const int size_;
const std::chrono::time_point<std::chrono::steady_clock> deadline_;
const int maxRetries_{3};

Expand All @@ -269,6 +280,8 @@ class ConnectOperation final
void connectLoop(
std::shared_ptr<Loop> loop,
const Address& remote,
const int rank,
const int size,
std::chrono::milliseconds timeout,
typename ConnectOperation::callback_t fn);

Expand Down
2 changes: 2 additions & 0 deletions gloo/transport/tcp/pair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ void Pair::connect(const std::vector<char>& bytes) {
device_->connect(
self_,
peer,
context_->rank,
context_->size,
timeout_,
std::bind(
&Pair::connectCallback,
Expand Down
Loading