Skip to content

Commit

Permalink
delete has no more data after the key
Browse files Browse the repository at this point in the history
The tcpstore delete key implementation inadvertendly set "moreData"
when sending the key when it was in fact the last message.

Thank you, @PetrochukM for the reproducing example.

Fixes #53872
  • Loading branch information
t-vi committed Mar 12, 2021
1 parent 1772e26 commit 81870e6
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
76 changes: 76 additions & 0 deletions test/distributed/test_c10d.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,82 @@ def _test_numkeys_delkeys(self, fs):
self.assertEqual(b"value1", fs.get("key1"))
self.assertEqual(b"value2", fs.get("key4"))

store = self._create_store()
keys = [str(i) for i in range(10)]
t0 = time.perf_counter()
[store.set(k, k) for k in keys]
dur_set = time.perf_counter()
t0 = time.perf_counter()
[store.delete_key(k) for k in keys]
dur_delete = time.perf_counter()
print(dur_set, dur_delete)

def _test_delkey_perf_worker(self, index, addr, port, world_size, messages):
try:
store = dist.TCPStore(addr, port, world_size, timeout=timedelta(seconds=10))
if index == 0:
keys = [str(i) for i in range(1, world_size)]

for i in range(5):
start = time.perf_counter()
for k in keys:
store.wait([k])
dur_wait = (time.perf_counter() - start) * 1000

start = time.perf_counter()
for k in keys:
store.get(k)
dur_store = (time.perf_counter() - start) * 1000

start = time.perf_counter()
for k in keys:
store.delete_key(k)
dur_delete = (time.perf_counter() - start) * 1000

if dur_delete > 5 * max(dur_wait, dur_store):
messages.put(
f"Delete is awfully slow {dur_delete:.2f}ms "
f"vs. wait {dur_wait:.2f}ms, store {dur_store:.2f}ms"
)
sys.exit(MultiProcessTestCase.TEST_ERROR_EXIT_CODE)

time.sleep(0.1)
else:
for i in range(5):
store.set(str(index), str(index))
time.sleep(0.1)

except Exception:
messages.put('Caught exception: \n{}exiting process with exit code: {}'
.format(traceback.format_exc(), MultiProcessTestCase.TEST_ERROR_EXIT_CODE))
sys.exit(MultiProcessTestCase.TEST_ERROR_EXIT_CODE)


@unittest.skipIf(
IS_WINDOWS, "Skip test for windows due to multiprocessing library error when using windows spawn"
)
def test_delkey_perf(self):
# test that delkey is roughly in line perfwise, see issue #53872
addr = DEFAULT_HOSTNAME
world_size = 5
server_store = create_tcp_store(addr, world_size, wait_for_workers=False)
server_store.set("key", "value")
port = server_store.port
messages = mp.Queue()
processes = []
num_processes = world_size
for i in range(num_processes):
p = mp.Process(target=self._test_delkey_perf_worker, args=(i, addr, port, world_size, messages))
processes.append(p)
p.start()
for p in processes:
p.join()
error_message = ""
while not messages.empty():
error_message += messages.get() + "\n"
if any([p.exitcode != 0 for p in processes]):
raise RuntimeError(error_message)

def test_numkeys_delkeys(self):
self._test_numkeys_delkeys(self._create_store())

Expand Down
2 changes: 1 addition & 1 deletion torch/lib/c10d/TCPStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ int64_t TCPStore::add(const std::string& key, int64_t value) {
bool TCPStore::deleteKey(const std::string& key) {
std::string regKey = regularPrefix_ + key;
tcputil::sendValue<QueryType>(storeSocket_, QueryType::DELETE_KEY);
tcputil::sendString(storeSocket_, regKey, true);
tcputil::sendString(storeSocket_, regKey);
auto numDeleted = tcputil::recvValue<int64_t>(storeSocket_);
return (numDeleted == 1);
}
Expand Down

0 comments on commit 81870e6

Please sign in to comment.