diff --git a/dvc/analytics.py b/dvc/analytics.py index 6643db7684..e4b0dc425e 100644 --- a/dvc/analytics.py +++ b/dvc/analytics.py @@ -66,7 +66,7 @@ def __init__(self, info=None): raise self.user_id_file = os.path.join(cdir, self.USER_ID_FILE) - self.user_id_file_lock = Lock(cdir, self.USER_ID_FILE + ".lock") + self.user_id_file_lock = Lock(self.user_id_file + ".lock") @staticmethod def load(path): @@ -113,7 +113,7 @@ def _get_user_id(self): return user_id except LockError: msg = "Failed to acquire '{}'" - logger.debug(msg.format(self.user_id_file_lock.lock_file)) + logger.debug(msg.format(self.user_id_file_lock.lockfile)) def _collect_windows(self): import sys diff --git a/dvc/lock.py b/dvc/lock.py index 81356a0708..f552cbae67 100644 --- a/dvc/lock.py +++ b/dvc/lock.py @@ -3,58 +3,118 @@ from __future__ import unicode_literals import os +import hashlib import time -import zc.lockfile - -from dvc.exceptions import DvcException +from datetime import timedelta +from funcy.py3 import lkeep -class LockError(DvcException): - """Thrown when unable to acquire the lock for dvc repo.""" +from dvc.exceptions import DvcException +from dvc.utils import makedirs +from dvc.utils.compat import is_py3 -class Lock(object): - """Class for dvc repo lock. +DEFAULT_TIMEOUT = 5 - Args: - dvc_dir (str): path to the directory that the lock should be created - in. - name (str): name of the lock file. - """ - LOCK_FILE = "lock" - TIMEOUT = 5 +class LockError(DvcException): + """Thrown when unable to acquire the lock for dvc repo.""" - def __init__(self, dvc_dir, name=LOCK_FILE): - self.lock_file = os.path.join(dvc_dir, name) - self._lock = None - def _do_lock(self): - try: - self._lock = zc.lockfile.LockFile(self.lock_file) - except zc.lockfile.LockError: - raise LockError( - "cannot perform the cmd since DVC is busy and " - "locked. Please retry the cmd later." - ) +if is_py3: + import flufl.lock + + class Lock(flufl.lock.Lock): + """Class for dvc repo lock. + + Args: + lockfile (str): the lock filename + in. + lifetime (int | timedelta): hold the lock for so long. + tmp_dir (str): a directory to store claim files. + """ + + def __init__(self, lockfile, lifetime=None, tmp_dir=None): + if lifetime is None: + lifetime = timedelta(days=365) # Lock for good by default + elif isinstance(lifetime, int): + lifetime = timedelta(seconds=lifetime) + + self._tmp_dir = tmp_dir + if self._tmp_dir is not None: + makedirs(self._tmp_dir, exist_ok=True) + + super(Lock, self).__init__(lockfile, lifetime=lifetime) + + @property + def lockfile(self): + return self._lockfile + + @property + def files(self): + return lkeep([self._lockfile, self._tmp_dir]) + + def lock(self): + try: + super(Lock, self).lock(timedelta(seconds=DEFAULT_TIMEOUT)) + except flufl.lock.TimeOutError: + raise LockError( + "cannot perform the cmd since DVC is busy and " + "locked. Please retry the cmd later." + ) + + def _set_claimfile(self, pid=None): + super(Lock, self)._set_claimfile(pid) + + if self._tmp_dir is not None: + # Under Windows file path length is limited so we hash it + filename = hashlib.md5(self._claimfile.encode()).hexdigest() + self._claimfile = os.path.join( + self._tmp_dir, filename + ".lock" + ) + + +else: + import zc.lockfile + + class Lock(object): + """Class for dvc repo lock. + + Uses zc.lockfile as backend. + """ + + def __init__(self, lockfile, lifetime=None, tmp_dir=None): + self.lockfile = lockfile + self._lock = None + + @property + def files(self): + return [self.lockfile] + + def _do_lock(self): + try: + self._lock = zc.lockfile.LockFile(self.lockfile) + except zc.lockfile.LockError: + raise LockError( + "cannot perform the cmd since DVC is busy and " + "locked. Please retry the cmd later." + ) + + def lock(self): + try: + self._do_lock() + return + except LockError: + time.sleep(DEFAULT_TIMEOUT) - def lock(self): - """Acquire lock for dvc repo.""" - try: self._do_lock() - return - except LockError: - time.sleep(self.TIMEOUT) - - self._do_lock() - def unlock(self): - """Release lock for dvc repo.""" - self._lock.close() - self._lock = None + def unlock(self): + self._lock.close() + self._lock = None - def __enter__(self): - self.lock() + def __enter__(self): + self.lock() - def __exit__(self, typ, value, tbck): - self.unlock() + def __exit__(self, typ, value, tbck): + self.unlock() diff --git a/dvc/logger.py b/dvc/logger.py index f9fe77b8d1..cafedd728c 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -196,6 +196,10 @@ def setup(level=logging.INFO): "level": logging.CRITICAL, "handlers": ["console", "console_errors"], }, + "flufl.lock": { + "level": logging.CRITICAL, + "handlers": ["console", "console_errors"], + }, }, } ) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 2f743a6ff9..b1ca827e6b 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -70,7 +70,10 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) - self.lock = Lock(self.dvc_dir) + self.lock = Lock( + os.path.join(self.dvc_dir, "lock"), + tmp_dir=os.path.join(self.dvc_dir, "tmp"), + ) # NOTE: storing state and link_state in the repository itself to avoid # any possible state corruption in 'shared cache dir' scenario. self.state = State(self, self.config.config) @@ -128,12 +131,12 @@ def _ignore(self): updater = Updater(self.dvc_dir) - flist = [ - self.lock.lock_file, - self.config.config_local_file, - updater.updater_file, - updater.lock.lock_file, - ] + self.state.files + flist = ( + [self.config.config_local_file, updater.updater_file] + + self.state.files + + self.lock.files + + updater.lock.files + ) if self.cache.local.cache_dir.startswith(self.root_dir): flist += [self.cache.local.cache_dir] diff --git a/dvc/state.py b/dvc/state.py index 4a2529b225..61f82549d9 100644 --- a/dvc/state.py +++ b/dvc/state.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import os +import re import sqlite3 import logging @@ -10,7 +11,8 @@ from dvc.utils import remove, current_timestamp, relpath, to_chunks from dvc.exceptions import DvcException from dvc.utils.fs import get_mtime_and_size, get_inode -from dvc.utils.compat import fspath_py35 +from dvc.utils.compat import fspath_py35, urlencode, urlunparse, is_py2 + SQLITE_MAX_VARIABLES_NUMBER = 999 @@ -213,7 +215,11 @@ def load(self): assert self.cursor is None assert self.inserts == 0 empty = not os.path.exists(self.state_file) - self.database = sqlite3.connect(self.state_file) + # NOTE: we use nolock option because fcntl() lock sqlite uses + # doesn't work on some older NFS/CIFS filesystems. + # This opens a possibility of data corruption by concurrent writes, + # which is prevented by repo lock. + self.database = _connect_sqlite(self.state_file, {"nolock": 1}) self.cursor = self.database.cursor() # Try loading once to check that the file is indeed a database @@ -473,3 +479,25 @@ def remove_unused_links(self, used): self.LINK_STATE_TABLE, ",".join(["?"] * len(chunk_unused)) ) self._execute(cmd, tuple(chunk_unused)) + + +def _connect_sqlite(filename, options): + # Connect by URI was added in Python 3.4 and sqlite 3.7.7, + # we ignore options, which should be fine unless repo is on old NFS/CIFS + if is_py2 or sqlite3.sqlite_version_info < (3, 7, 7): + return sqlite3.connect(filename) + + uri = _build_sqlite_uri(filename, options) + return sqlite3.connect(uri, uri=True) + + +def _build_sqlite_uri(filename, options): + # Convert filename to uri according to https://www.sqlite.org/uri.html, 3.1 + uri_path = filename.replace("?", "%3f").replace("#", "%23") + if os.name == "nt": + uri_path = uri_path.replace("\\", "/") + uri_path = re.sub(r"^([a-z]:)", "/\\1", uri_path, flags=re.I) + uri_path = re.sub(r"/+", "/", uri_path) + + # Empty netloc, params and fragment + return urlunparse(("file", "", uri_path, "", urlencode(options), "")) diff --git a/dvc/updater.py b/dvc/updater.py index 008e37ea7b..388d8c94b7 100644 --- a/dvc/updater.py +++ b/dvc/updater.py @@ -25,7 +25,9 @@ class Updater(object): # pragma: no cover def __init__(self, dvc_dir): self.dvc_dir = dvc_dir self.updater_file = os.path.join(dvc_dir, self.UPDATER_FILE) - self.lock = Lock(dvc_dir, self.updater_file + ".lock") + self.lock = Lock( + self.updater_file + ".lock", tmp_dir=os.path.join(dvc_dir, "tmp") + ) self.current = parse_version(__version__).base_version def _is_outdated_file(self): @@ -41,7 +43,7 @@ def _with_lock(self, func, action): func() except LockError: msg = "Failed to acquire '{}' before {} updates" - logger.debug(msg.format(self.lock.lock_file, action)) + logger.debug(msg.format(self.lock.lockfile, action)) def check(self): if os.getenv("CI") or os.getenv("DVC_TEST"): diff --git a/dvc/utils/compat.py b/dvc/utils/compat.py index 9dad37d3f1..f45becc9c7 100644 --- a/dvc/utils/compat.py +++ b/dvc/utils/compat.py @@ -101,7 +101,8 @@ def ignore_file_not_found(): if is_py2: - from urlparse import urlparse, urljoin, urlsplit, urlunsplit # noqa: F401 + from urlparse import urlparse, urlunparse, urljoin # noqa: F401 + from urllib import urlencode # noqa: F401 from BaseHTTPServer import HTTPServer # noqa: F401 from SimpleHTTPServer import SimpleHTTPRequestHandler # noqa: F401 import ConfigParser # noqa: F401 @@ -144,9 +145,9 @@ def __exit__(self, *args): from os import makedirs # noqa: F401 from urllib.parse import ( # noqa: F401 urlparse, # noqa: F401 + urlunparse, # noqa: F401 + urlencode, # noqa: F401 urljoin, # noqa: F401 - urlsplit, # noqa: F401 - urlunsplit, # noqa: F401 ) from io import StringIO, BytesIO # noqa: F401 from http.server import ( # noqa: F401 diff --git a/setup.py b/setup.py index 94c3b5153d..cc811fa86b 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,6 @@ def run(self): install_requires = [ "ply>=3.9", # See https://github.com/pyinstaller/pyinstaller/issues/1945 "configparser>=3.5.0", - "zc.lockfile>=1.2.1", "future>=0.16.0", "colorama>=0.3.9", "configobj>=5.0.6", @@ -139,7 +138,12 @@ def run(self): "ssh_gssapi": ssh_gssapi, "hdfs": hdfs, # NOTE: https://github.com/inveniosoftware/troubleshooting/issues/1 - ":python_version=='2.7'": ["futures", "pathlib2"], + ":python_version=='2.7'": [ + "futures", + "pathlib2", + "zc.lockfile>=1.2.1", + ], + ":python_version>='3.0'": ["flufl.lock>=3.2"], "tests": tests_requirements, }, keywords="data science, data version control, machine learning", diff --git a/tests/func/test_lock.py b/tests/func/test_lock.py index 1c5a0bda23..ad04f22edc 100644 --- a/tests/func/test_lock.py +++ b/tests/func/test_lock.py @@ -1,3 +1,5 @@ +import os + from dvc.lock import LockError from dvc.main import main from dvc.lock import Lock @@ -7,15 +9,17 @@ class TestLock(TestDvc): def test_with(self): - lock = Lock(self.dvc.dvc_dir) + lockfile = os.path.join(self.dvc.dvc_dir, "lock") + lock = Lock(lockfile) with lock: with self.assertRaises(LockError): - lock2 = Lock(self.dvc.dvc_dir) + lock2 = Lock(lockfile) with lock2: self.assertTrue(False) def test_cli(self): - lock = Lock(self.dvc.dvc_dir) + lockfile = os.path.join(self.dvc.dvc_dir, "lock") + lock = Lock(lockfile) with lock: ret = main(["add", self.FOO]) self.assertEqual(ret, 1)