Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions torch/csrc/distributed/c10d/TCPStoreLibUvBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ class UvTcpServer : public UvTcpSocket {
int uv_res = uv_tcp_open((uv_tcp_t*)res->unsafeGetStream(), socket);
TORCH_CHECK(
uv_res == 0,
"Failed to open existing socket. socket:{} code:{} name:{} message:{}",
"Failed to open existing socket. ",
"socket: ",
socket,
", code: ",
uv_res,
", name: ",
uv_err_name(uv_res),
", message: ",
uv_strerror(uv_res));

res->cacheSocketPort();
Expand Down Expand Up @@ -221,30 +225,42 @@ class UvTcpServer : public UvTcpSocket {
}
TORCH_CHECK(
uv_res == 0,
"UV Store addr parsing failure. useIpv6:{} code:{} name:{} message:{}",
"UV Store addr parsing failure. ",
"useIpv6: ",
useIpv6,
", code: ",
uv_res,
", name: ",
uv_err_name(uv_res),
", message: ",
uv_strerror(uv_res));

uv_res =
uv_tcp_bind(res->unsafeGetSocket(), (const struct sockaddr*)&addr, 0);
TORCH_CHECK(
uv_res == 0,
"UV Store bind failed. useIpv6:{} code:{} name:{} message:{}",
"The server socket has failed to bind. ",
"useIpv6: ",
useIpv6,
", code: ",
uv_res,
", name: ",
uv_err_name(uv_res),
", message: ",
uv_strerror(uv_res));

uv_res =
uv_listen(res->unsafeGetStream(), DEFAULT_BACKLOG, on_new_connection);
TORCH_CHECK(
uv_res == 0,
"UV Store listen failed. useIpv6:{} code:{} name:{} message:{}",
"The server socket has failed to listen on any local network address. ",
"useIpv6: ",
useIpv6,
", code: ",
uv_res,
", name: ",
uv_err_name(uv_res),
", message: ",
uv_strerror(uv_res));

res->cacheSocketPort();
Expand All @@ -265,9 +281,12 @@ class UvTcpServer : public UvTcpSocket {
uv_accept(unsafeGetStream(), (uv_stream_t*)socket->unsafeGetHandle());
TORCH_CHECK(
res == 0,
"Failed to accept socket. code:{} name:{} desc:{}.",
"Failed to accept socket. ",
"code: ",
res,
", name: ",
uv_err_name(res),
", message: ",
uv_strerror(res));
}

Expand Down Expand Up @@ -458,9 +477,12 @@ class ChunkedStream {
if (buff_idx >= buffers.size() && remaining > 0) {
TORCH_CHECK(
false,
"Trying to read past end of buffer buffer_idx:{} available:{} remaining:{}",
"Trying to read past end of buffer. ",
"buffer_idx: ",
buff_idx,
", available: ",
buffers.size(),
", remaining: ",
remaining);
}
}
Expand Down Expand Up @@ -498,8 +520,10 @@ class ChunkedStream {
return false;
TORCH_CHECK(
size <= MAX_STRING_LEN,
"Invalid string size. size:{} max:{}",
"Invalid string size. ",
"size: ",
size,
", max: ",
MAX_STRING_LEN);

if (available() < size)
Expand All @@ -515,8 +539,10 @@ class ChunkedStream {
auto size_in_bytes = size * sizeof(uint8_t);
TORCH_CHECK(
size_in_bytes <= MAX_PAYLOAD_LEN,
"Invalid payload size. size: {} max:{}",
"Invalid payload size. ",
"size: ",
size_in_bytes,
", max: ",
MAX_PAYLOAD_LEN);

if (available() < size_in_bytes)
Expand Down Expand Up @@ -782,8 +808,10 @@ class UvClient : public UvTcpSocket {
return false;
TORCH_CHECK(
key_count <= MAX_KEY_COUNT,
"Too many keys being waited. keys:{} max:{}",
"Too many keys being waited. ",
"keys: ",
key_count,
", max: ",
MAX_KEY_COUNT);

std::vector<std::string> keys(key_count);
Expand All @@ -810,8 +838,10 @@ class UvClient : public UvTcpSocket {
}
TORCH_CHECK(
key_count <= MAX_KEY_COUNT,
"Too many keys being waited. keys:{} max:{}",
"Too many keys being waited. ",
"keys: ",
key_count,
", max: ",
MAX_KEY_COUNT);

std::vector<std::string> keys(key_count);
Expand Down Expand Up @@ -872,8 +902,10 @@ class UvClient : public UvTcpSocket {
}
TORCH_CHECK(
key_count <= MAX_KEY_COUNT,
"Too many keys with multi_get. keys:{} max:{}",
"Too many keys with multi_get. ",
"keys: ",
key_count,
", max: ",
MAX_KEY_COUNT);

StreamWriter sw(iptr());
Expand All @@ -898,8 +930,10 @@ class UvClient : public UvTcpSocket {
}
TORCH_CHECK(
key_count <= MAX_KEY_COUNT,
"Too many keys with multi_get. keys:{} max:{}",
"Too many keys with multi_get. ",
"keys: ",
key_count,
", max: ",
MAX_KEY_COUNT);

for (const auto _ : c10::irange(key_count)) {
Expand Down Expand Up @@ -988,9 +1022,11 @@ void LibUVStoreDaemon::init(const TCPStoreOptions& opts) {
port_ = tcpServer->port();
TORCH_CHECK(
port_ == opts.port || opts.port == 0, // zero means use any port
"listen fd {} is bound to port {}, expected to be bound to port {}",
"listen fd ",
*opts.masterListenFd,
" is bound to port ",
port_,
", expected to be bound to port ",
opts.port);
}

Expand Down