Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions dvc/rwlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from .exceptions import DvcException
from .fs import localfs
from .lock import make_lock
from .utils import relpath

INFO_SCHEMA = {Required("pid"): int, Required("cmd"): str}
Expand All @@ -33,22 +34,29 @@ def __init__(self, path):


@contextmanager
def _edit_rwlock(lock_dir, fs):
def _edit_rwlock(lock_dir, fs, hardlink):
path = fs.path.join(lock_dir, "rwlock")
try:
with fs.open(path, encoding="utf-8") as fobj:
lock = SCHEMA(json.load(fobj))
except FileNotFoundError:
lock = SCHEMA({})
except json.JSONDecodeError as exc:
raise RWLockFileCorruptedError(path) from exc
except Invalid as exc:
raise RWLockFileFormatError(path) from exc
lock["read"] = defaultdict(list, lock["read"])
lock["write"] = defaultdict(dict, lock["write"])
yield lock
with fs.open(path, "w", encoding="utf-8") as fobj:
json.dump(lock, fobj)

rwlock_guard = make_lock(
fs.path.join(lock_dir, "rwlock.lock"),
tmp_dir=lock_dir,
hardlink_lock=hardlink,
)
with rwlock_guard:
try:
with fs.open(path, encoding="utf-8") as fobj:
lock = SCHEMA(json.load(fobj))
except FileNotFoundError:
lock = SCHEMA({})
except json.JSONDecodeError as exc:
raise RWLockFileCorruptedError(path) from exc
except Invalid as exc:
raise RWLockFileFormatError(path) from exc
lock["read"] = defaultdict(list, lock["read"])
lock["write"] = defaultdict(dict, lock["write"])
yield lock
with fs.open(path, "w", encoding="utf-8") as fobj:
json.dump(lock, fobj)


def _infos_to_str(infos):
Expand Down Expand Up @@ -133,7 +141,7 @@ def _release_read(lock, info, changes):


@contextmanager
def rwlock(tmp_dir, fs, cmd, read, write):
def rwlock(tmp_dir, fs, cmd, read, write, hardlink):
"""Create non-thread-safe RWLock for file paths.

Args:
Expand All @@ -142,6 +150,7 @@ def rwlock(tmp_dir, fs, cmd, read, write):
cmd (str): command that will be working on these file path.
read ([str]): file paths that are going to be read.
write ([str]): file paths that are going to be written.
hardlink (bool): use hardlink lock to guard rwlock file when on edit.

Raises:
LockError: raised if file paths we want to read is being written to by
Expand All @@ -153,7 +162,7 @@ def rwlock(tmp_dir, fs, cmd, read, write):
"""
info = {"pid": os.getpid(), "cmd": cmd}

with _edit_rwlock(tmp_dir, fs) as lock:
with _edit_rwlock(tmp_dir, fs, hardlink) as lock:

_check_blockers(lock, info, mode="write", waiters=read + write)
_check_blockers(lock, info, mode="read", waiters=write)
Expand All @@ -164,6 +173,6 @@ def rwlock(tmp_dir, fs, cmd, read, write):
try:
yield
finally:
with _edit_rwlock(tmp_dir, fs) as lock:
with _edit_rwlock(tmp_dir, fs, hardlink) as lock:
_release_write(lock, info, wchanges)
_release_read(lock, info, rchanges)
7 changes: 6 additions & 1 deletion dvc/stage/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ def _chain(names):
cmd = " ".join(sys.argv)

with rwlock(
stage.repo.tmp_dir, stage.repo.fs, cmd, _chain(read), _chain(write)
stage.repo.tmp_dir,
stage.repo.fs,
cmd,
_chain(read),
_chain(write),
stage.repo.config["core"].get("hardlink_lock", False),
):
return call()

Expand Down
56 changes: 34 additions & 22 deletions tests/unit/test_rwlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,79 @@ def test_rwlock(tmp_path):
path = os.fspath(tmp_path)
foo = "foo"

with rwlock(path, localfs, "cmd1", [foo], []):
with rwlock(path, localfs, "cmd1", [foo], [], False):
with pytest.raises(LockError):
with rwlock(path, localfs, "cmd2", [], [foo]):
with rwlock(path, localfs, "cmd2", [], [foo], False):
pass

with rwlock(path, localfs, "cmd1", [], [foo]):
with rwlock(path, localfs, "cmd1", [], [foo], False):
with pytest.raises(LockError):
with rwlock(path, localfs, "cmd2", [foo], []):
with rwlock(path, localfs, "cmd2", [foo], [], False):
pass

with rwlock(path, localfs, "cmd1", [], [foo]):
with rwlock(path, localfs, "cmd1", [], [foo], False):
with pytest.raises(LockError):
with rwlock(path, localfs, "cmd2", [], [foo]):
with rwlock(path, localfs, "cmd2", [], [foo], False):
pass


def test_rwlock_reentrant(tmp_path):
path = os.fspath(tmp_path)
foo = "foo"

with rwlock(path, localfs, "cmd1", [], [foo]):
with rwlock(path, localfs, "cmd1", [], [foo]):
with rwlock(path, localfs, "cmd1", [], [foo], False):
with rwlock(path, localfs, "cmd1", [], [foo], False):
pass
with _edit_rwlock(path, localfs) as lock:
with _edit_rwlock(path, localfs, False) as lock:
assert lock == {
"read": {},
"write": {"foo": {"cmd": "cmd1", "pid": os.getpid()}},
}

with rwlock(path, localfs, "cmd", [foo], []):
with rwlock(path, localfs, "cmd", [foo], []):
with rwlock(path, localfs, "cmd", [foo], [], False):
with rwlock(path, localfs, "cmd", [foo], [], False):
pass
with _edit_rwlock(path, localfs) as lock:
with _edit_rwlock(path, localfs, False) as lock:
assert lock == {
"read": {"foo": [{"cmd": "cmd", "pid": os.getpid()}]},
"write": {},
}


def test_rwlock_edit_is_guarded(tmp_path, mocker):
# patching to speedup tests
mocker.patch("dvc.lock.DEFAULT_TIMEOUT", 0.01)

path = os.fspath(tmp_path)

with _edit_rwlock(path, localfs, False):
with pytest.raises(LockError):
with _edit_rwlock(path, localfs, False):
pass


def test_rwlock_subdirs(tmp_path):
path = os.fspath(tmp_path)
foo = "foo"
subfoo = os.path.join("foo", "subfoo")

with rwlock(path, localfs, "cmd1", [foo], []):
with rwlock(path, localfs, "cmd1", [foo], [], False):
with pytest.raises(LockError, match=r"subfoo(.|\n)*cmd1"):
with rwlock(path, localfs, "cmd2", [], [subfoo]):
with rwlock(path, localfs, "cmd2", [], [subfoo], False):
pass

with rwlock(path, localfs, "cmd1", [], [subfoo]):
with rwlock(path, localfs, "cmd1", [], [subfoo], False):
with pytest.raises(LockError, match=r"'foo'(.|\n)*cmd1"):
with rwlock(path, localfs, "cmd2", [foo], []):
with rwlock(path, localfs, "cmd2", [foo], [], False):
pass

with rwlock(path, localfs, "cmd1", [], [subfoo]):
with rwlock(path, localfs, "cmd1", [], [subfoo], False):
with pytest.raises(LockError):
with rwlock(path, localfs, "cmd2", [], [foo]):
with rwlock(path, localfs, "cmd2", [], [foo], False):
pass

with rwlock(path, localfs, "cmd1", [subfoo], []):
with rwlock(path, localfs, "cmd2", [foo], []):
with rwlock(path, localfs, "cmd1", [subfoo], [], False):
with rwlock(path, localfs, "cmd2", [foo], [], False):
pass


Expand All @@ -86,10 +98,10 @@ def test_broken_rwlock(tmp_path):

path.write_text('{"broken": "format"}', encoding="utf-8")
with pytest.raises(RWLockFileFormatError):
with _edit_rwlock(dir_path, localfs):
with _edit_rwlock(dir_path, localfs, False):
pass

path.write_text("{broken json", encoding="utf-8")
with pytest.raises(RWLockFileCorruptedError):
with _edit_rwlock(dir_path, localfs):
with _edit_rwlock(dir_path, localfs, False):
pass