Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dvc/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, info=None):
raise

self.user_id_file = os.path.join(cdir, self.USER_ID_FILE)
self.user_id_file_lock = Lock(cdir, self.USER_ID_FILE + ".lock")
self.user_id_file_lock = Lock(self.user_id_file + ".lock")

@staticmethod
def load(path):
Expand Down Expand Up @@ -113,7 +113,7 @@ def _get_user_id(self):
return user_id
except LockError:
msg = "Failed to acquire '{}'"
logger.debug(msg.format(self.user_id_file_lock.lock_file))
logger.debug(msg.format(self.user_id_file_lock.lockfile))

def _collect_windows(self):
import sys
Expand Down
142 changes: 101 additions & 41 deletions dvc/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,118 @@
from __future__ import unicode_literals

import os
import hashlib
import time
import zc.lockfile

from dvc.exceptions import DvcException
from datetime import timedelta

from funcy.py3 import lkeep

class LockError(DvcException):
"""Thrown when unable to acquire the lock for dvc repo."""
from dvc.exceptions import DvcException
from dvc.utils import makedirs
from dvc.utils.compat import is_py3


class Lock(object):
"""Class for dvc repo lock.
DEFAULT_TIMEOUT = 5

Args:
dvc_dir (str): path to the directory that the lock should be created
in.
name (str): name of the lock file.
"""

LOCK_FILE = "lock"
TIMEOUT = 5
class LockError(DvcException):
"""Thrown when unable to acquire the lock for dvc repo."""

def __init__(self, dvc_dir, name=LOCK_FILE):
self.lock_file = os.path.join(dvc_dir, name)
self._lock = None

def _do_lock(self):
try:
self._lock = zc.lockfile.LockFile(self.lock_file)
except zc.lockfile.LockError:
raise LockError(
"cannot perform the cmd since DVC is busy and "
"locked. Please retry the cmd later."
)
if is_py3:
import flufl.lock

class Lock(flufl.lock.Lock):
"""Class for dvc repo lock.
Args:
lockfile (str): the lock filename
in.
lifetime (int | timedelta): hold the lock for so long.
tmp_dir (str): a directory to store claim files.
"""

def __init__(self, lockfile, lifetime=None, tmp_dir=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for setting a long default lifetime. Don't see this one used anywhere now

Suggested change
def __init__(self, lockfile, lifetime=None, tmp_dir=None):
def __init__(self, lockfile, tmp_dir=None):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record: will remove this myself right after merge to move along faster.

if lifetime is None:
lifetime = timedelta(days=365) # Lock for good by default
elif isinstance(lifetime, int):
lifetime = timedelta(seconds=lifetime)

self._tmp_dir = tmp_dir
if self._tmp_dir is not None:
makedirs(self._tmp_dir, exist_ok=True)

super(Lock, self).__init__(lockfile, lifetime=lifetime)

@property
def lockfile(self):
return self._lockfile

@property
def files(self):
return lkeep([self._lockfile, self._tmp_dir])

def lock(self):
try:
super(Lock, self).lock(timedelta(seconds=DEFAULT_TIMEOUT))
except flufl.lock.TimeOutError:
raise LockError(
"cannot perform the cmd since DVC is busy and "
"locked. Please retry the cmd later."
)

def _set_claimfile(self, pid=None):
super(Lock, self)._set_claimfile(pid)

if self._tmp_dir is not None:
# Under Windows file path length is limited so we hash it
filename = hashlib.md5(self._claimfile.encode()).hexdigest()
self._claimfile = os.path.join(
self._tmp_dir, filename + ".lock"
)


else:
import zc.lockfile

class Lock(object):
"""Class for dvc repo lock.
Uses zc.lockfile as backend.
"""

def __init__(self, lockfile, lifetime=None, tmp_dir=None):
self.lockfile = lockfile
self._lock = None

@property
def files(self):
return [self.lockfile]

def _do_lock(self):
try:
self._lock = zc.lockfile.LockFile(self.lockfile)
except zc.lockfile.LockError:
raise LockError(
"cannot perform the cmd since DVC is busy and "
"locked. Please retry the cmd later."
)

def lock(self):
try:
self._do_lock()
return
except LockError:
time.sleep(DEFAULT_TIMEOUT)

def lock(self):
"""Acquire lock for dvc repo."""
try:
self._do_lock()
return
except LockError:
time.sleep(self.TIMEOUT)

self._do_lock()

def unlock(self):
"""Release lock for dvc repo."""
self._lock.close()
self._lock = None
def unlock(self):
self._lock.close()
self._lock = None

def __enter__(self):
self.lock()
def __enter__(self):
self.lock()

def __exit__(self, typ, value, tbck):
self.unlock()
def __exit__(self, typ, value, tbck):
self.unlock()
4 changes: 4 additions & 0 deletions dvc/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ def setup(level=logging.INFO):
"level": logging.CRITICAL,
"handlers": ["console", "console_errors"],
},
"flufl.lock": {
"level": logging.CRITICAL,
"handlers": ["console", "console_errors"],
},
},
}
)
17 changes: 10 additions & 7 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ def __init__(self, root_dir=None):

self.tree = WorkingTree(self.root_dir)

self.lock = Lock(self.dvc_dir)
self.lock = Lock(
os.path.join(self.dvc_dir, "lock"),
tmp_dir=os.path.join(self.dvc_dir, "tmp"),
)
# NOTE: storing state and link_state in the repository itself to avoid
# any possible state corruption in 'shared cache dir' scenario.
self.state = State(self, self.config.config)
Expand Down Expand Up @@ -128,12 +131,12 @@ def _ignore(self):

updater = Updater(self.dvc_dir)

flist = [
self.lock.lock_file,
self.config.config_local_file,
updater.updater_file,
updater.lock.lock_file,
] + self.state.files
flist = (
[self.config.config_local_file, updater.updater_file]
+ self.state.files
+ self.lock.files
+ updater.lock.files
)

if self.cache.local.cache_dir.startswith(self.root_dir):
flist += [self.cache.local.cache_dir]
Expand Down
32 changes: 30 additions & 2 deletions dvc/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from __future__ import unicode_literals

import os
import re
import sqlite3
import logging

from dvc.config import Config
from dvc.utils import remove, current_timestamp, relpath, to_chunks
from dvc.exceptions import DvcException
from dvc.utils.fs import get_mtime_and_size, get_inode
from dvc.utils.compat import fspath_py35
from dvc.utils.compat import fspath_py35, urlencode, urlunparse, is_py2


SQLITE_MAX_VARIABLES_NUMBER = 999

Expand Down Expand Up @@ -213,7 +215,11 @@ def load(self):
assert self.cursor is None
assert self.inserts == 0
empty = not os.path.exists(self.state_file)
self.database = sqlite3.connect(self.state_file)
# NOTE: we use nolock option because fcntl() lock sqlite uses
# doesn't work on some older NFS/CIFS filesystems.
# This opens a possibility of data corruption by concurrent writes,
# which is prevented by repo lock.
self.database = _connect_sqlite(self.state_file, {"nolock": 1})
self.cursor = self.database.cursor()

# Try loading once to check that the file is indeed a database
Expand Down Expand Up @@ -473,3 +479,25 @@ def remove_unused_links(self, used):
self.LINK_STATE_TABLE, ",".join(["?"] * len(chunk_unused))
)
self._execute(cmd, tuple(chunk_unused))


def _connect_sqlite(filename, options):
# Connect by URI was added in Python 3.4 and sqlite 3.7.7,
# we ignore options, which should be fine unless repo is on old NFS/CIFS
if is_py2 or sqlite3.sqlite_version_info < (3, 7, 7):
return sqlite3.connect(filename)

uri = _build_sqlite_uri(filename, options)
return sqlite3.connect(uri, uri=True)


def _build_sqlite_uri(filename, options):
# Convert filename to uri according to https://www.sqlite.org/uri.html, 3.1
uri_path = filename.replace("?", "%3f").replace("#", "%23")
if os.name == "nt":
uri_path = uri_path.replace("\\", "/")
uri_path = re.sub(r"^([a-z]:)", "/\\1", uri_path, flags=re.I)
uri_path = re.sub(r"/+", "/", uri_path)

# Empty netloc, params and fragment
return urlunparse(("file", "", uri_path, "", urlencode(options), ""))
6 changes: 4 additions & 2 deletions dvc/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class Updater(object): # pragma: no cover
def __init__(self, dvc_dir):
self.dvc_dir = dvc_dir
self.updater_file = os.path.join(dvc_dir, self.UPDATER_FILE)
self.lock = Lock(dvc_dir, self.updater_file + ".lock")
self.lock = Lock(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't set lifetime here, is there some default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 5 seconds. Plus 10 seconds, which flufl uses internally to defend against time desync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not acceptable, I can see it taking that long for the updater on bad network. Just set the longest time by default in Lock and don't configure it unnecessarily each time you use Lock. It is the way our old locks worked, it should stay that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Suor ?

self.updater_file + ".lock", tmp_dir=os.path.join(dvc_dir, "tmp")
)
self.current = parse_version(__version__).base_version

def _is_outdated_file(self):
Expand All @@ -41,7 +43,7 @@ def _with_lock(self, func, action):
func()
except LockError:
msg = "Failed to acquire '{}' before {} updates"
logger.debug(msg.format(self.lock.lock_file, action))
logger.debug(msg.format(self.lock.lockfile, action))

def check(self):
if os.getenv("CI") or os.getenv("DVC_TEST"):
Expand Down
7 changes: 4 additions & 3 deletions dvc/utils/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def ignore_file_not_found():


if is_py2:
from urlparse import urlparse, urljoin, urlsplit, urlunsplit # noqa: F401
from urlparse import urlparse, urlunparse, urljoin # noqa: F401
from urllib import urlencode # noqa: F401
from BaseHTTPServer import HTTPServer # noqa: F401
from SimpleHTTPServer import SimpleHTTPRequestHandler # noqa: F401
import ConfigParser # noqa: F401
Expand Down Expand Up @@ -144,9 +145,9 @@ def __exit__(self, *args):
from os import makedirs # noqa: F401
from urllib.parse import ( # noqa: F401
urlparse, # noqa: F401
urlunparse, # noqa: F401
urlencode, # noqa: F401
urljoin, # noqa: F401
urlsplit, # noqa: F401
urlunsplit, # noqa: F401
)
from io import StringIO, BytesIO # noqa: F401
from http.server import ( # noqa: F401
Expand Down
8 changes: 6 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def run(self):
install_requires = [
"ply>=3.9", # See https://github.com/pyinstaller/pyinstaller/issues/1945
"configparser>=3.5.0",
"zc.lockfile>=1.2.1",
"future>=0.16.0",
"colorama>=0.3.9",
"configobj>=5.0.6",
Expand Down Expand Up @@ -139,7 +138,12 @@ def run(self):
"ssh_gssapi": ssh_gssapi,
"hdfs": hdfs,
# NOTE: https://github.com/inveniosoftware/troubleshooting/issues/1
":python_version=='2.7'": ["futures", "pathlib2"],
":python_version=='2.7'": [
"futures",
"pathlib2",
"zc.lockfile>=1.2.1",
],
":python_version>='3.0'": ["flufl.lock>=3.2"],
"tests": tests_requirements,
},
keywords="data science, data version control, machine learning",
Expand Down
10 changes: 7 additions & 3 deletions tests/func/test_lock.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from dvc.lock import LockError
from dvc.main import main
from dvc.lock import Lock
Expand All @@ -7,15 +9,17 @@

class TestLock(TestDvc):
def test_with(self):
lock = Lock(self.dvc.dvc_dir)
lockfile = os.path.join(self.dvc.dvc_dir, "lock")
lock = Lock(lockfile)
with lock:
with self.assertRaises(LockError):
lock2 = Lock(self.dvc.dvc_dir)
lock2 = Lock(lockfile)
with lock2:
self.assertTrue(False)

def test_cli(self):
lock = Lock(self.dvc.dvc_dir)
lockfile = os.path.join(self.dvc.dvc_dir, "lock")
lock = Lock(lockfile)
with lock:
ret = main(["add", self.FOO])
self.assertEqual(ret, 1)