From 1bdaca0d6c9ddf84e4f3a5a951821e445c1312b0 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Tue, 3 Sep 2019 20:03:26 +0700 Subject: [PATCH 1/5] dvc: switch to flufl lock This locks employs hardlink trick, which works on NFS and alike unlike fcntl/open locks. Closes #1918. --- dvc/analytics.py | 4 +-- dvc/lock.py | 63 +++++++++++++++++++++-------------------- dvc/logger.py | 4 +++ dvc/repo/__init__.py | 12 ++++++-- dvc/updater.py | 6 ++-- setup.py | 4 +-- tests/func/test_lock.py | 10 +++++-- 7 files changed, 60 insertions(+), 43 deletions(-) diff --git a/dvc/analytics.py b/dvc/analytics.py index 6643db7684..e4b0dc425e 100644 --- a/dvc/analytics.py +++ b/dvc/analytics.py @@ -66,7 +66,7 @@ def __init__(self, info=None): raise self.user_id_file = os.path.join(cdir, self.USER_ID_FILE) - self.user_id_file_lock = Lock(cdir, self.USER_ID_FILE + ".lock") + self.user_id_file_lock = Lock(self.user_id_file + ".lock") @staticmethod def load(path): @@ -113,7 +113,7 @@ def _get_user_id(self): return user_id except LockError: msg = "Failed to acquire '{}'" - logger.debug(msg.format(self.user_id_file_lock.lock_file)) + logger.debug(msg.format(self.user_id_file_lock.lockfile)) def _collect_windows(self): import sys diff --git a/dvc/lock.py b/dvc/lock.py index 81356a0708..40ecc30939 100644 --- a/dvc/lock.py +++ b/dvc/lock.py @@ -3,58 +3,59 @@ from __future__ import unicode_literals import os -import time -import zc.lockfile +from datetime import timedelta +from flufl.lock import Lock as _Lock, TimeOutError from dvc.exceptions import DvcException +from dvc.utils import makedirs class LockError(DvcException): """Thrown when unable to acquire the lock for dvc repo.""" -class Lock(object): +class Lock(_Lock): """Class for dvc repo lock. Args: - dvc_dir (str): path to the directory that the lock should be created + lockfile (str): the lock filename in. - name (str): name of the lock file. + lifetime (int | timedelta): hold the lock for so long. + tmp_dir (str): a directory to store claim files. """ - LOCK_FILE = "lock" TIMEOUT = 5 - def __init__(self, dvc_dir, name=LOCK_FILE): - self.lock_file = os.path.join(dvc_dir, name) - self._lock = None + def __init__(self, lockfile, lifetime=None, tmp_dir=None): + if isinstance(lifetime, int): + lifetime = timedelta(seconds=lifetime) - def _do_lock(self): + self._tmp_dir = tmp_dir + if self._tmp_dir is not None: + makedirs(self._tmp_dir, exist_ok=True) + + super(Lock, self).__init__(lockfile, lifetime=lifetime) + + @property + def lockfile(self): + return self._lockfile + + @property + def tmp_dir(self): + return self._tmp_dir + + def lock(self): try: - self._lock = zc.lockfile.LockFile(self.lock_file) - except zc.lockfile.LockError: + super(Lock, self).lock(timedelta(seconds=self.TIMEOUT)) + except TimeOutError: raise LockError( "cannot perform the cmd since DVC is busy and " "locked. Please retry the cmd later." ) - def lock(self): - """Acquire lock for dvc repo.""" - try: - self._do_lock() - return - except LockError: - time.sleep(self.TIMEOUT) - - self._do_lock() - - def unlock(self): - """Release lock for dvc repo.""" - self._lock.close() - self._lock = None - - def __enter__(self): - self.lock() + def _set_claimfile(self, pid=None): + super(Lock, self)._set_claimfile(pid) - def __exit__(self, typ, value, tbck): - self.unlock() + if self._tmp_dir is not None: + filename = self._claimfile.replace(os.sep, "_") + self._claimfile = os.path.join(self._tmp_dir, filename) diff --git a/dvc/logger.py b/dvc/logger.py index f9fe77b8d1..cafedd728c 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -196,6 +196,10 @@ def setup(level=logging.INFO): "level": logging.CRITICAL, "handlers": ["console", "console_errors"], }, + "flufl.lock": { + "level": logging.CRITICAL, + "handlers": ["console", "console_errors"], + }, }, } ) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 2f743a6ff9..0502adbcc2 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -70,7 +70,12 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) - self.lock = Lock(self.dvc_dir) + self.lock = Lock( + os.path.join(self.dvc_dir, "lock"), + # This should be longer than operations we are protecting + lifetime=60 * 60 * 24 * 30, + tmp_dir=os.path.join(self.dvc_dir, "tmp"), + ) # NOTE: storing state and link_state in the repository itself to avoid # any possible state corruption in 'shared cache dir' scenario. self.state = State(self, self.config.config) @@ -129,10 +134,11 @@ def _ignore(self): updater = Updater(self.dvc_dir) flist = [ - self.lock.lock_file, + self.lock.lockfile, + self.lock.tmp_dir, self.config.config_local_file, updater.updater_file, - updater.lock.lock_file, + updater.lock.lockfile, ] + self.state.files if self.cache.local.cache_dir.startswith(self.root_dir): diff --git a/dvc/updater.py b/dvc/updater.py index 008e37ea7b..388d8c94b7 100644 --- a/dvc/updater.py +++ b/dvc/updater.py @@ -25,7 +25,9 @@ class Updater(object): # pragma: no cover def __init__(self, dvc_dir): self.dvc_dir = dvc_dir self.updater_file = os.path.join(dvc_dir, self.UPDATER_FILE) - self.lock = Lock(dvc_dir, self.updater_file + ".lock") + self.lock = Lock( + self.updater_file + ".lock", tmp_dir=os.path.join(dvc_dir, "tmp") + ) self.current = parse_version(__version__).base_version def _is_outdated_file(self): @@ -41,7 +43,7 @@ def _with_lock(self, func, action): func() except LockError: msg = "Failed to acquire '{}' before {} updates" - logger.debug(msg.format(self.lock.lock_file, action)) + logger.debug(msg.format(self.lock.lockfile, action)) def check(self): if os.getenv("CI") or os.getenv("DVC_TEST"): diff --git a/setup.py b/setup.py index 94c3b5153d..c80bb2ac2d 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,6 @@ def run(self): install_requires = [ "ply>=3.9", # See https://github.com/pyinstaller/pyinstaller/issues/1945 "configparser>=3.5.0", - "zc.lockfile>=1.2.1", "future>=0.16.0", "colorama>=0.3.9", "configobj>=5.0.6", @@ -139,7 +138,8 @@ def run(self): "ssh_gssapi": ssh_gssapi, "hdfs": hdfs, # NOTE: https://github.com/inveniosoftware/troubleshooting/issues/1 - ":python_version=='2.7'": ["futures", "pathlib2"], + ":python_version=='2.7'": ["futures", "pathlib2", "flufl.lock==2.4.1"], + ":python_version>='3.0'": ["flufl.lock>=3.2"], "tests": tests_requirements, }, keywords="data science, data version control, machine learning", diff --git a/tests/func/test_lock.py b/tests/func/test_lock.py index 1c5a0bda23..ad04f22edc 100644 --- a/tests/func/test_lock.py +++ b/tests/func/test_lock.py @@ -1,3 +1,5 @@ +import os + from dvc.lock import LockError from dvc.main import main from dvc.lock import Lock @@ -7,15 +9,17 @@ class TestLock(TestDvc): def test_with(self): - lock = Lock(self.dvc.dvc_dir) + lockfile = os.path.join(self.dvc.dvc_dir, "lock") + lock = Lock(lockfile) with lock: with self.assertRaises(LockError): - lock2 = Lock(self.dvc.dvc_dir) + lock2 = Lock(lockfile) with lock2: self.assertTrue(False) def test_cli(self): - lock = Lock(self.dvc.dvc_dir) + lockfile = os.path.join(self.dvc.dvc_dir, "lock") + lock = Lock(lockfile) with lock: ret = main(["add", self.FOO]) self.assertEqual(ret, 1) From 6aeccfcc07aa0b0037d57f4c76faaf4edcb66c6b Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Wed, 4 Sep 2019 15:24:36 +0700 Subject: [PATCH 2/5] dvc: fix long lock filenames under Windows --- dvc/lock.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dvc/lock.py b/dvc/lock.py index 40ecc30939..32a4f460cf 100644 --- a/dvc/lock.py +++ b/dvc/lock.py @@ -4,6 +4,7 @@ import os from datetime import timedelta +import hashlib from flufl.lock import Lock as _Lock, TimeOutError from dvc.exceptions import DvcException @@ -57,5 +58,6 @@ def _set_claimfile(self, pid=None): super(Lock, self)._set_claimfile(pid) if self._tmp_dir is not None: - filename = self._claimfile.replace(os.sep, "_") - self._claimfile = os.path.join(self._tmp_dir, filename) + # Under Windows file path length is limited so we hash it + filename = hashlib.md5(self._claimfile.encode()).hexdigest() + self._claimfile = os.path.join(self._tmp_dir, filename + ".lock") From f29f07ca889365c275b35d842efc55a44f18fa18 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Wed, 4 Sep 2019 15:25:28 +0700 Subject: [PATCH 3/5] dvc: do not lock sqlite db --- dvc/state.py | 32 ++++++++++++++++++++++++++++++-- dvc/utils/compat.py | 7 ++++--- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/dvc/state.py b/dvc/state.py index 4a2529b225..61f82549d9 100644 --- a/dvc/state.py +++ b/dvc/state.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import os +import re import sqlite3 import logging @@ -10,7 +11,8 @@ from dvc.utils import remove, current_timestamp, relpath, to_chunks from dvc.exceptions import DvcException from dvc.utils.fs import get_mtime_and_size, get_inode -from dvc.utils.compat import fspath_py35 +from dvc.utils.compat import fspath_py35, urlencode, urlunparse, is_py2 + SQLITE_MAX_VARIABLES_NUMBER = 999 @@ -213,7 +215,11 @@ def load(self): assert self.cursor is None assert self.inserts == 0 empty = not os.path.exists(self.state_file) - self.database = sqlite3.connect(self.state_file) + # NOTE: we use nolock option because fcntl() lock sqlite uses + # doesn't work on some older NFS/CIFS filesystems. + # This opens a possibility of data corruption by concurrent writes, + # which is prevented by repo lock. + self.database = _connect_sqlite(self.state_file, {"nolock": 1}) self.cursor = self.database.cursor() # Try loading once to check that the file is indeed a database @@ -473,3 +479,25 @@ def remove_unused_links(self, used): self.LINK_STATE_TABLE, ",".join(["?"] * len(chunk_unused)) ) self._execute(cmd, tuple(chunk_unused)) + + +def _connect_sqlite(filename, options): + # Connect by URI was added in Python 3.4 and sqlite 3.7.7, + # we ignore options, which should be fine unless repo is on old NFS/CIFS + if is_py2 or sqlite3.sqlite_version_info < (3, 7, 7): + return sqlite3.connect(filename) + + uri = _build_sqlite_uri(filename, options) + return sqlite3.connect(uri, uri=True) + + +def _build_sqlite_uri(filename, options): + # Convert filename to uri according to https://www.sqlite.org/uri.html, 3.1 + uri_path = filename.replace("?", "%3f").replace("#", "%23") + if os.name == "nt": + uri_path = uri_path.replace("\\", "/") + uri_path = re.sub(r"^([a-z]:)", "/\\1", uri_path, flags=re.I) + uri_path = re.sub(r"/+", "/", uri_path) + + # Empty netloc, params and fragment + return urlunparse(("file", "", uri_path, "", urlencode(options), "")) diff --git a/dvc/utils/compat.py b/dvc/utils/compat.py index 9dad37d3f1..f45becc9c7 100644 --- a/dvc/utils/compat.py +++ b/dvc/utils/compat.py @@ -101,7 +101,8 @@ def ignore_file_not_found(): if is_py2: - from urlparse import urlparse, urljoin, urlsplit, urlunsplit # noqa: F401 + from urlparse import urlparse, urlunparse, urljoin # noqa: F401 + from urllib import urlencode # noqa: F401 from BaseHTTPServer import HTTPServer # noqa: F401 from SimpleHTTPServer import SimpleHTTPRequestHandler # noqa: F401 import ConfigParser # noqa: F401 @@ -144,9 +145,9 @@ def __exit__(self, *args): from os import makedirs # noqa: F401 from urllib.parse import ( # noqa: F401 urlparse, # noqa: F401 + urlunparse, # noqa: F401 + urlencode, # noqa: F401 urljoin, # noqa: F401 - urlsplit, # noqa: F401 - urlunsplit, # noqa: F401 ) from io import StringIO, BytesIO # noqa: F401 from http.server import ( # noqa: F401 From d9d85f3cb0d07757742b248c4e129d58c2ae640b Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Mon, 9 Sep 2019 14:35:00 +0700 Subject: [PATCH 4/5] dvc: fix locks in Python2.7/Windows Return to zc.lockfile lock in Python 2, NFS is not supported there anyway since sqlite3 doesn't support URIs. --- dvc/lock.py | 133 ++++++++++++++++++++++++++++++------------- dvc/repo/__init__.py | 13 ++--- setup.py | 6 +- 3 files changed, 106 insertions(+), 46 deletions(-) diff --git a/dvc/lock.py b/dvc/lock.py index 32a4f460cf..eb691f33d6 100644 --- a/dvc/lock.py +++ b/dvc/lock.py @@ -3,61 +3,118 @@ from __future__ import unicode_literals import os -from datetime import timedelta import hashlib -from flufl.lock import Lock as _Lock, TimeOutError +import time +from datetime import timedelta + +from funcy.py3 import lkeep from dvc.exceptions import DvcException from dvc.utils import makedirs +from dvc.utils.compat import is_py3 + + +DEFAULT_TIMEOUT = 5 class LockError(DvcException): """Thrown when unable to acquire the lock for dvc repo.""" -class Lock(_Lock): - """Class for dvc repo lock. +if is_py3: + import flufl.lock + + class Lock(flufl.lock.Lock): + """Class for dvc repo lock. + + Args: + lockfile (str): the lock filename + in. + lifetime (int | timedelta): hold the lock for so long. + tmp_dir (str): a directory to store claim files. + """ + + def __init__(self, lockfile, lifetime=None, tmp_dir=None): + if isinstance(lifetime, int): + lifetime = timedelta(seconds=lifetime) + + self._tmp_dir = tmp_dir + if self._tmp_dir is not None: + makedirs(self._tmp_dir, exist_ok=True) + + super(Lock, self).__init__(lockfile, lifetime=lifetime) + + @property + def lockfile(self): + return self._lockfile + + @property + def files(self): + return lkeep([self._lockfile, self._tmp_dir]) + + def lock(self): + try: + super(Lock, self).lock(timedelta(seconds=DEFAULT_TIMEOUT)) + except flufl.lock.TimeOutError: + raise LockError( + "cannot perform the cmd since DVC is busy and " + "locked. Please retry the cmd later." + ) + + def _set_claimfile(self, pid=None): + super(Lock, self)._set_claimfile(pid) + + if self._tmp_dir is not None: + # Under Windows file path length is limited so we hash it + filename = hashlib.md5(self._claimfile.encode()).hexdigest() + self._claimfile = os.path.join( + self._tmp_dir, filename + ".lock" + ) + + +else: + import zc.lockfile + + class Lock(object): + """Class for dvc repo lock. - Args: - lockfile (str): the lock filename - in. - lifetime (int | timedelta): hold the lock for so long. - tmp_dir (str): a directory to store claim files. - """ + Uses zc.lockfile as backend. + """ - TIMEOUT = 5 + TIMEOUT = 5 - def __init__(self, lockfile, lifetime=None, tmp_dir=None): - if isinstance(lifetime, int): - lifetime = timedelta(seconds=lifetime) + def __init__(self, lockfile, lifetime=None, tmp_dir=None): + self.lockfile = lockfile + self._lock = None - self._tmp_dir = tmp_dir - if self._tmp_dir is not None: - makedirs(self._tmp_dir, exist_ok=True) + @property + def files(self): + return [self.lockfile] - super(Lock, self).__init__(lockfile, lifetime=lifetime) + def _do_lock(self): + try: + self._lock = zc.lockfile.LockFile(self.lockfile) + except zc.lockfile.LockError: + raise LockError( + "cannot perform the cmd since DVC is busy and " + "locked. Please retry the cmd later." + ) - @property - def lockfile(self): - return self._lockfile + def lock(self): + try: + self._do_lock() + return + except LockError: + time.sleep(DEFAULT_TIMEOUT) - @property - def tmp_dir(self): - return self._tmp_dir + self._do_lock() - def lock(self): - try: - super(Lock, self).lock(timedelta(seconds=self.TIMEOUT)) - except TimeOutError: - raise LockError( - "cannot perform the cmd since DVC is busy and " - "locked. Please retry the cmd later." - ) + def unlock(self): + self._lock.close() + self._lock = None - def _set_claimfile(self, pid=None): - super(Lock, self)._set_claimfile(pid) + def __enter__(self): + self.lock() - if self._tmp_dir is not None: - # Under Windows file path length is limited so we hash it - filename = hashlib.md5(self._claimfile.encode()).hexdigest() - self._claimfile = os.path.join(self._tmp_dir, filename + ".lock") + def __exit__(self, typ, value, tbck): + self.unlock() diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 0502adbcc2..70acdabc04 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -133,13 +133,12 @@ def _ignore(self): updater = Updater(self.dvc_dir) - flist = [ - self.lock.lockfile, - self.lock.tmp_dir, - self.config.config_local_file, - updater.updater_file, - updater.lock.lockfile, - ] + self.state.files + flist = ( + [self.config.config_local_file, updater.updater_file] + + self.state.files + + self.lock.files + + updater.lock.files + ) if self.cache.local.cache_dir.startswith(self.root_dir): flist += [self.cache.local.cache_dir] diff --git a/setup.py b/setup.py index c80bb2ac2d..cc811fa86b 100644 --- a/setup.py +++ b/setup.py @@ -138,7 +138,11 @@ def run(self): "ssh_gssapi": ssh_gssapi, "hdfs": hdfs, # NOTE: https://github.com/inveniosoftware/troubleshooting/issues/1 - ":python_version=='2.7'": ["futures", "pathlib2", "flufl.lock==2.4.1"], + ":python_version=='2.7'": [ + "futures", + "pathlib2", + "zc.lockfile>=1.2.1", + ], ":python_version>='3.0'": ["flufl.lock>=3.2"], "tests": tests_requirements, }, From 3c3378bdca7060238fc8dddc06e4ce86e363f3bf Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Wed, 11 Sep 2019 14:21:12 +0700 Subject: [PATCH 5/5] dvc: make lock lifetime long by default --- dvc/lock.py | 6 +++--- dvc/repo/__init__.py | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/dvc/lock.py b/dvc/lock.py index eb691f33d6..f552cbae67 100644 --- a/dvc/lock.py +++ b/dvc/lock.py @@ -35,7 +35,9 @@ class Lock(flufl.lock.Lock): """ def __init__(self, lockfile, lifetime=None, tmp_dir=None): - if isinstance(lifetime, int): + if lifetime is None: + lifetime = timedelta(days=365) # Lock for good by default + elif isinstance(lifetime, int): lifetime = timedelta(seconds=lifetime) self._tmp_dir = tmp_dir @@ -81,8 +83,6 @@ class Lock(object): Uses zc.lockfile as backend. """ - TIMEOUT = 5 - def __init__(self, lockfile, lifetime=None, tmp_dir=None): self.lockfile = lockfile self._lock = None diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 70acdabc04..b1ca827e6b 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -72,8 +72,6 @@ def __init__(self, root_dir=None): self.lock = Lock( os.path.join(self.dvc_dir, "lock"), - # This should be longer than operations we are protecting - lifetime=60 * 60 * 24 * 30, tmp_dir=os.path.join(self.dvc_dir, "tmp"), ) # NOTE: storing state and link_state in the repository itself to avoid