diff --git a/dvc/rwlock.py b/dvc/rwlock.py index d805bcd55a..ce452d0a0c 100644 --- a/dvc/rwlock.py +++ b/dvc/rwlock.py @@ -5,7 +5,6 @@ from contextlib import contextmanager from voluptuous import Schema, Invalid -from funcy.py3 import lmap, lfilter, lmapcat from .exceptions import DvcException from .lock import LockError @@ -61,38 +60,25 @@ def _infos_to_str(infos): ) -def _check_no_writers(lock, info, path_infos): - for path_info in path_infos: - blocking_urls = lfilter(path_info.overlaps, lock["write"]) - if not blocking_urls: - continue +def _check_blockers(lock, info, *, mode, waiters): + for path_info in waiters: + blockers = [ + info + for path, infos in lock[mode].items() + if path_info.overlaps(path) + if info not in (infos if type(infos) is list else [infos]) + ] - writers = lmap(lock["write"].get, blocking_urls) - writers = lfilter(lambda i: info != i, writers) - if not writers: + if not blockers: continue raise LockError( - "'{}' is busy, it is being written to by:\n{}".format( - str(path_info), _infos_to_str(writers) - ) - ) - - -def _check_no_readers(lock, info, path_infos): - for path_info in path_infos: - blocking_urls = lfilter(path_info.overlaps, lock["read"]) - if not blocking_urls: - continue - - readers = lmapcat(lock["read"].get, blocking_urls) - readers = lfilter(lambda i: info != i, readers) - if not readers: - continue - - raise LockError( - "'{}' is busy, it is being read by:\n{}".format( - str(path_info), _infos_to_str(readers) + "'{path}' is busy, it is being blocked by:\n" + "{blockers}\n" + "\n" + "If there are no processes with such PIDs, you can manually remove" + "'.dvc/tmp/rwlock' and try again.".format( + path=str(path_info), blockers=_infos_to_str(blockers) ) ) @@ -171,8 +157,9 @@ def rwlock(tmp_dir, cmd, read, write): info = {"pid": os.getpid(), "cmd": cmd} with _edit_rwlock(tmp_dir) as lock: - _check_no_writers(lock, info, read + write) - _check_no_readers(lock, info, write) + + _check_blockers(lock, info, mode="write", waiters=read + write) + _check_blockers(lock, info, mode="read", waiters=write) rchanges = _acquire_read(lock, info, read) wchanges = _acquire_write(lock, info, write)