New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
C10D: Added TCPStore to support C10D store interface #7560
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like most of the code is coming straight from THD, is that right? Can you please post diffs comparing them to the original files? That would make reviewing much easier.
torch/lib/c10d/Utils.cpp
Outdated
int connect(const std::string& address, | ||
PortType port, | ||
bool wait, | ||
int timeout) { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
|
||
bytesToReceive -= bytesReceived; | ||
currentBytes += bytesReceived; | ||
} |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/test/TcpStoreTest.cpp
Outdated
c10d::test::set(*slaveStores[i], key, val); | ||
c10d::test::check(*slaveStores[i], key, val); | ||
} | ||
}))); |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
@apaszke Utils.cpp/hpp are mostly from THD with some refactoring. TcpStore.hpp/cpp are based on architecture of Gloo store, but the API implementation needs some changes to support these new functions like wait with timeout, add, check functions that are defined in the new base store class. I sent you a diff file internally on messenger. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @teng-li!
General comments:
- Can we name it TCPStore instead?
- There is inconsistency in style, w.r.t. double newlines, braces on newline vs end of line, function argument indentation, namespace comments.
- A few comments below talk about this: I think we should not let the exception propagation determine shutdown of the daemon thread. It will happen one node dies for whatever reason and this will immediately propagate everywhere. Instead it should just go away, be removed from the fd vector, and the daemon should keep running.
torch/lib/c10d/TcpStore.cpp
Outdated
keysAwaited_.push_back(0); | ||
fds.push_back({ .fd = sockFd, .events = POLLIN }); | ||
} | ||
for (size_t rank = 0; rank < sockets_.size(); rank++) { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
|
||
SYSCHECK(::poll(fds.data(), fds.size(), -1)); | ||
|
||
if (fds[0].revents != 0) { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
|
||
if (fds[0].revents != 0) { | ||
if (fds[0].revents ^ POLLIN) { | ||
throw std::system_error(ECONNABORTED, std::system_category()); |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
* side has been closed. If the closing was due to normal exit, then the | ||
* store should exit too. Otherwise, if it was different exception, | ||
* other processes will get an exception once they try to use the store. | ||
*/ |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
} | ||
|
||
void TcpStoreDaemon::join() { | ||
daemonThread_.join(); |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
|
||
{ | ||
if (isServer_) { | ||
// Openning up the listening socket |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
return false; | ||
} else { | ||
throw std::runtime_error("stop_waiting or keep_waiting response expected"); | ||
} |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
* or, in the case of wait | ||
* type of query | number of args | size of arg1 | arg1 | ... | ||
*/ | ||
void TcpStoreDaemon::query(RankType rank) { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
void TcpStore::set(const std::string& key, const std::vector<uint8_t>& data) { | ||
tcputil::sendValue<QueryType>(storeSocket_, QueryType::SET); | ||
tcputil::sendString(storeSocket_, key, true); | ||
tcputil::sendVector<uint8_t>(storeSocket_, data); |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/test/CMakeLists.txt
Outdated
@@ -13,4 +14,5 @@ foreach(test_src ${test_srcs}) | |||
target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/..) | |||
target_link_libraries(${test_name} ${test_libraries}) | |||
add_test(NAME ${test_name} COMMAND $<TARGET_FILE:${test_name}>) | |||
|
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
@pietern Changed the file name to TCPStore, but somehow the new commit cannot see the diff :(, I can revert the name back so that you can see the diff and later rename it. Anyway, addressed all of your comments, like properly shutdown the daemon using pipes etc. @apaszke addressed your comments too, |
bbd1059
to
e11d975
Compare
torch/lib/c10d/TcpStore.cpp
Outdated
ret = false; | ||
it++; | ||
} else { | ||
it = keys.erase(it); |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
} | ||
/* sleep override */ | ||
std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||
} |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
} | ||
|
||
// send a string's length and data | ||
inline void sendString(int socket, |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/Utils.hpp
Outdated
const std::chrono::milliseconds& timeout = kNoTimeout); | ||
|
||
// Helper resource guard class | ||
class ResourceGuard { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/test/TCPStoreTest.cpp
Outdated
std::string val = "thread_val_" + | ||
std::to_string(numIterations - 1); | ||
c10d::test::check(*slaveStores[i], key, val); | ||
} |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few instances of the style inconsistencies I was talking about in the previous review. There are functions that start with a newline and ones that don't. Also the mixed use of // and /**/ is odd on the eyes.
torch/lib/c10d/TcpStore.cpp
Outdated
} | ||
|
||
TCPStoreDaemon::~TCPStoreDaemon() { | ||
|
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
} | ||
|
||
void TCPStoreDaemon::run() { | ||
|
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TcpStore.cpp
Outdated
void TCPStoreDaemon::run() { | ||
|
||
// Create the control pipe | ||
controlPipeFd_ = std::vector<int>{-1, -1}; |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/Utils.cpp
Outdated
return listenPort; | ||
} | ||
|
||
} |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/Utils.hpp
Outdated
return static_cast<RankType>(rank); | ||
} | ||
|
||
// TCP util namespace |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/test/TCPStoreTest.cpp
Outdated
c10d::test::Semaphore sem1, sem2; | ||
|
||
// Each thread will have a slave store to send/recv data | ||
std::vector<std::unique_ptr<c10d::TCPStore>> slaveStores; |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Micro nit remaining
torch/lib/c10d/TcpStore.cpp
Outdated
} | ||
|
||
bool TCPStoreDaemon:: | ||
checkKeys(const std::vector<std::string>& keys) const { |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
torch/lib/c10d/TCPStore.cpp
Outdated
// we hit an exception here. | ||
::close(fds[fdIdx].fd); | ||
fds.erase(fds.begin() + fdIdx); | ||
sockets_.erase(sockets_.begin() + fdIdx - 2); |
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
This comment was marked as off-topic.
This comment was marked as off-topic.
Sorry, something went wrong.
e5f7c5c
to
0171a57
Compare
* upstream/master: Makes AccumulateGrad high priority in backwards passes (pytorch#7604) [C++ API] Implement builder style construction (pytorch#7597) C10D: Added TCPStore to support C10D store interface (pytorch#7560) [auto] Update onnx to ba86ec2 - Protobuf typing (onnx/onnx#982) onnx/onnx@ba86ec2 Add LBFGS optimization algorithm to C++ API (pytorch#7596)
Reference: pytorch#7434 * C10D: Added TCPStore to support C10D store interface * Used pipe to terminate the store daemon and addressed all comments * Used notify/wake for wait and addressed all comments * Clean up nits * Clean up all socket states when the socket is closed
Added TCP Store as another Store for the C10D interface.
For more details on the C10D Store interface, refer: #7434
A few things done in the PR:
(1) TCP Store can be initialized either as a master who will launch a daemon thread or a slave who will connect to the master's store daemon thread.
(2) TCP Store supports all the interface functions for Store.h
(3) Refactored some of the TCP helper functions into Utils.hoo/cpp
(4) Refactored test to some common folders.
Test Plan: