From 64beee930760fb274655fcbfe920c721b4bd4986 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Tue, 9 Aug 2022 21:59:30 -0700 Subject: [PATCH] guard rwlock file to prevent not locked repo edits --- dvc/rwlock.py | 45 ++++++++++++++++++------------- dvc/stage/decorators.py | 7 ++++- tests/unit/test_rwlock.py | 56 ++++++++++++++++++++++++--------------- 3 files changed, 67 insertions(+), 41 deletions(-) diff --git a/dvc/rwlock.py b/dvc/rwlock.py index e1125b0500..b6937c1552 100644 --- a/dvc/rwlock.py +++ b/dvc/rwlock.py @@ -7,6 +7,7 @@ from .exceptions import DvcException from .fs import localfs +from .lock import make_lock from .utils import relpath INFO_SCHEMA = {Required("pid"): int, Required("cmd"): str} @@ -33,22 +34,29 @@ def __init__(self, path): @contextmanager -def _edit_rwlock(lock_dir, fs): +def _edit_rwlock(lock_dir, fs, hardlink): path = fs.path.join(lock_dir, "rwlock") - try: - with fs.open(path, encoding="utf-8") as fobj: - lock = SCHEMA(json.load(fobj)) - except FileNotFoundError: - lock = SCHEMA({}) - except json.JSONDecodeError as exc: - raise RWLockFileCorruptedError(path) from exc - except Invalid as exc: - raise RWLockFileFormatError(path) from exc - lock["read"] = defaultdict(list, lock["read"]) - lock["write"] = defaultdict(dict, lock["write"]) - yield lock - with fs.open(path, "w", encoding="utf-8") as fobj: - json.dump(lock, fobj) + + rwlock_guard = make_lock( + fs.path.join(lock_dir, "rwlock.lock"), + tmp_dir=lock_dir, + hardlink_lock=hardlink, + ) + with rwlock_guard: + try: + with fs.open(path, encoding="utf-8") as fobj: + lock = SCHEMA(json.load(fobj)) + except FileNotFoundError: + lock = SCHEMA({}) + except json.JSONDecodeError as exc: + raise RWLockFileCorruptedError(path) from exc + except Invalid as exc: + raise RWLockFileFormatError(path) from exc + lock["read"] = defaultdict(list, lock["read"]) + lock["write"] = defaultdict(dict, lock["write"]) + yield lock + with fs.open(path, "w", encoding="utf-8") as fobj: + json.dump(lock, fobj) def _infos_to_str(infos): @@ -133,7 +141,7 @@ def _release_read(lock, info, changes): @contextmanager -def rwlock(tmp_dir, fs, cmd, read, write): +def rwlock(tmp_dir, fs, cmd, read, write, hardlink): """Create non-thread-safe RWLock for file paths. Args: @@ -142,6 +150,7 @@ def rwlock(tmp_dir, fs, cmd, read, write): cmd (str): command that will be working on these file path. read ([str]): file paths that are going to be read. write ([str]): file paths that are going to be written. + hardlink (bool): use hardlink lock to guard rwlock file when on edit. Raises: LockError: raised if file paths we want to read is being written to by @@ -153,7 +162,7 @@ def rwlock(tmp_dir, fs, cmd, read, write): """ info = {"pid": os.getpid(), "cmd": cmd} - with _edit_rwlock(tmp_dir, fs) as lock: + with _edit_rwlock(tmp_dir, fs, hardlink) as lock: _check_blockers(lock, info, mode="write", waiters=read + write) _check_blockers(lock, info, mode="read", waiters=write) @@ -164,6 +173,6 @@ def rwlock(tmp_dir, fs, cmd, read, write): try: yield finally: - with _edit_rwlock(tmp_dir, fs) as lock: + with _edit_rwlock(tmp_dir, fs, hardlink) as lock: _release_write(lock, info, wchanges) _release_read(lock, info, rchanges) diff --git a/dvc/stage/decorators.py b/dvc/stage/decorators.py index 056d554078..db0012f97c 100644 --- a/dvc/stage/decorators.py +++ b/dvc/stage/decorators.py @@ -33,7 +33,12 @@ def _chain(names): cmd = " ".join(sys.argv) with rwlock( - stage.repo.tmp_dir, stage.repo.fs, cmd, _chain(read), _chain(write) + stage.repo.tmp_dir, + stage.repo.fs, + cmd, + _chain(read), + _chain(write), + stage.repo.config["core"].get("hardlink_lock", False), ): return call() diff --git a/tests/unit/test_rwlock.py b/tests/unit/test_rwlock.py index 4e932901c2..6bf7ae043c 100644 --- a/tests/unit/test_rwlock.py +++ b/tests/unit/test_rwlock.py @@ -16,19 +16,19 @@ def test_rwlock(tmp_path): path = os.fspath(tmp_path) foo = "foo" - with rwlock(path, localfs, "cmd1", [foo], []): + with rwlock(path, localfs, "cmd1", [foo], [], False): with pytest.raises(LockError): - with rwlock(path, localfs, "cmd2", [], [foo]): + with rwlock(path, localfs, "cmd2", [], [foo], False): pass - with rwlock(path, localfs, "cmd1", [], [foo]): + with rwlock(path, localfs, "cmd1", [], [foo], False): with pytest.raises(LockError): - with rwlock(path, localfs, "cmd2", [foo], []): + with rwlock(path, localfs, "cmd2", [foo], [], False): pass - with rwlock(path, localfs, "cmd1", [], [foo]): + with rwlock(path, localfs, "cmd1", [], [foo], False): with pytest.raises(LockError): - with rwlock(path, localfs, "cmd2", [], [foo]): + with rwlock(path, localfs, "cmd2", [], [foo], False): pass @@ -36,47 +36,59 @@ def test_rwlock_reentrant(tmp_path): path = os.fspath(tmp_path) foo = "foo" - with rwlock(path, localfs, "cmd1", [], [foo]): - with rwlock(path, localfs, "cmd1", [], [foo]): + with rwlock(path, localfs, "cmd1", [], [foo], False): + with rwlock(path, localfs, "cmd1", [], [foo], False): pass - with _edit_rwlock(path, localfs) as lock: + with _edit_rwlock(path, localfs, False) as lock: assert lock == { "read": {}, "write": {"foo": {"cmd": "cmd1", "pid": os.getpid()}}, } - with rwlock(path, localfs, "cmd", [foo], []): - with rwlock(path, localfs, "cmd", [foo], []): + with rwlock(path, localfs, "cmd", [foo], [], False): + with rwlock(path, localfs, "cmd", [foo], [], False): pass - with _edit_rwlock(path, localfs) as lock: + with _edit_rwlock(path, localfs, False) as lock: assert lock == { "read": {"foo": [{"cmd": "cmd", "pid": os.getpid()}]}, "write": {}, } +def test_rwlock_edit_is_guarded(tmp_path, mocker): + # patching to speedup tests + mocker.patch("dvc.lock.DEFAULT_TIMEOUT", 0.01) + + path = os.fspath(tmp_path) + + with _edit_rwlock(path, localfs, False): + with pytest.raises(LockError): + with _edit_rwlock(path, localfs, False): + pass + + def test_rwlock_subdirs(tmp_path): path = os.fspath(tmp_path) foo = "foo" subfoo = os.path.join("foo", "subfoo") - with rwlock(path, localfs, "cmd1", [foo], []): + with rwlock(path, localfs, "cmd1", [foo], [], False): with pytest.raises(LockError, match=r"subfoo(.|\n)*cmd1"): - with rwlock(path, localfs, "cmd2", [], [subfoo]): + with rwlock(path, localfs, "cmd2", [], [subfoo], False): pass - with rwlock(path, localfs, "cmd1", [], [subfoo]): + with rwlock(path, localfs, "cmd1", [], [subfoo], False): with pytest.raises(LockError, match=r"'foo'(.|\n)*cmd1"): - with rwlock(path, localfs, "cmd2", [foo], []): + with rwlock(path, localfs, "cmd2", [foo], [], False): pass - with rwlock(path, localfs, "cmd1", [], [subfoo]): + with rwlock(path, localfs, "cmd1", [], [subfoo], False): with pytest.raises(LockError): - with rwlock(path, localfs, "cmd2", [], [foo]): + with rwlock(path, localfs, "cmd2", [], [foo], False): pass - with rwlock(path, localfs, "cmd1", [subfoo], []): - with rwlock(path, localfs, "cmd2", [foo], []): + with rwlock(path, localfs, "cmd1", [subfoo], [], False): + with rwlock(path, localfs, "cmd2", [foo], [], False): pass @@ -86,10 +98,10 @@ def test_broken_rwlock(tmp_path): path.write_text('{"broken": "format"}', encoding="utf-8") with pytest.raises(RWLockFileFormatError): - with _edit_rwlock(dir_path, localfs): + with _edit_rwlock(dir_path, localfs, False): pass path.write_text("{broken json", encoding="utf-8") with pytest.raises(RWLockFileCorruptedError): - with _edit_rwlock(dir_path, localfs): + with _edit_rwlock(dir_path, localfs, False): pass