From 0fdb88a54caaf2b63428b003215021b8004352c4 Mon Sep 17 00:00:00 2001 From: pawel Date: Thu, 28 Nov 2019 17:52:54 +0100 Subject: [PATCH 1/8] remote: s3: adjust jobs number basing on file descriptors number --- dvc/remote/base.py | 16 ++++++++++++++-- dvc/remote/local.py | 2 +- dvc/remote/s3.py | 18 ++++++++++++++++++ dvc/remote/ssh/__init__.py | 10 +++++++--- 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index f72e56127d..85f6401c4d 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -13,6 +13,8 @@ from shortuuid import uuid +from funcy import cached_property + import dvc.prompt as prompt from dvc.config import Config from dvc.exceptions import ( @@ -75,7 +77,6 @@ class RemoteBASE(object): scheme = "base" path_cls = URLInfo REQUIRES = {} - JOBS = 4 * cpu_count() PARAM_RELPATH = "relpath" CHECKSUM_DIR_SUFFIX = ".dir" @@ -84,6 +85,10 @@ class RemoteBASE(object): state = StateNoop() + @cached_property + def jobs(self): + return cpu_count() * 4 + def __init__(self, repo, config): self.repo = repo @@ -746,6 +751,11 @@ def changed_cache(self, checksum): return self._changed_dir_cache(checksum) return self.changed_cache_file(checksum) + def _adjust_jobs(self, jobs=None): + if not jobs: + jobs = self.jobs + return jobs + def cache_exists(self, checksums, jobs=None, name=None): """Check if the given checksums are stored in the remote. @@ -784,7 +794,9 @@ def exists_with_progress(path_info): pbar.update_desc(str(path_info)) return ret - with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: + with ThreadPoolExecutor( + max_workers=self._adjust_jobs(jobs) + ) as executor: path_infos = map(self.checksum_to_path_info, checksums) in_remote = executor.map(exists_with_progress, path_infos) ret = list(itertools.compress(checksums, in_remote)) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 9c68e3a808..40c4cd31be 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -339,7 +339,7 @@ def _process( status = STATUS_NEW if jobs is None: - jobs = remote.JOBS + jobs = remote.jobs status_info = self.status( named_cache, diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 6366fc426c..329b9a14f9 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -3,6 +3,7 @@ import logging import os +import resource import threading from funcy import cached_property, wrap_prop @@ -335,3 +336,20 @@ def _append_aws_grants_to_extra_args(self, config): ) self.extra_args[extra_args_key] = config.get(grant_option) + + def _adjust_jobs(self, jobs=None): + jobs = super(RemoteS3, self)._adjust_jobs(jobs) + + descriptor_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + estimated_descriptors_num = jobs * 20 + if estimated_descriptors_num <= descriptor_limit - 10: + return jobs + + jobs = (descriptor_limit - 10) // 20 + logger.warning( + "Parallelization reduced to '{}' jobs. Increase open " + "file descriptors limit to more than '{}' to prevent " + "the " + "reduction.".format(jobs, estimated_descriptors_num + 10) + ) + return jobs diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index 78da815e1a..87440672eb 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -40,7 +40,6 @@ class RemoteSSH(RemoteBASE): scheme = Schemes.SSH REQUIRES = {"paramiko": "paramiko"} - JOBS = 4 PARAM_CHECKSUM = "md5" DEFAULT_PORT = 22 TIMEOUT = 1800 @@ -51,6 +50,10 @@ class RemoteSSH(RemoteBASE): DEFAULT_CACHE_TYPES = ["copy"] + @property + def jobs(self): + return 4 + def __init__(self, repo, config): super(RemoteSSH, self).__init__(repo, config) url = config.get(Config.SECTION_REMOTE_URL) @@ -325,9 +328,10 @@ def cache_exists(self, checksums, jobs=None, name=None): def exists_with_progress(chunks): return self.batch_exists(chunks, callback=pbar.update_desc) - with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: + jobs = self._adjust_jobs(jobs) + with ThreadPoolExecutor(max_workers=jobs) as executor: path_infos = [self.checksum_to_path_info(x) for x in checksums] - chunks = to_chunks(path_infos, num_chunks=self.JOBS) + chunks = to_chunks(path_infos, num_chunks=jobs) results = executor.map(exists_with_progress, chunks) in_remote = itertools.chain.from_iterable(results) ret = list(itertools.compress(checksums, in_remote)) From c167eb2675639f80d4154fd93f1a312e73941d1a Mon Sep 17 00:00:00 2001 From: pawel Date: Thu, 5 Dec 2019 14:45:13 +0100 Subject: [PATCH 2/8] remote: s3: adjust jobs only on default --- dvc/remote/base.py | 15 ++++-------- dvc/remote/local.py | 3 +-- dvc/remote/s3.py | 46 +++++++++++++++++++++++++++--------- dvc/remote/ssh/__init__.py | 10 +++----- tests/unit/remote/test_s3.py | 33 +++++++++++++++++++++++++- tests/utils/__init__.py | 11 +++++++++ 6 files changed, 86 insertions(+), 32 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 85f6401c4d..81e22fa72d 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -77,6 +77,7 @@ class RemoteBASE(object): scheme = "base" path_cls = URLInfo REQUIRES = {} + JOBS = 4 * cpu_count() PARAM_RELPATH = "relpath" CHECKSUM_DIR_SUFFIX = ".dir" @@ -85,9 +86,8 @@ class RemoteBASE(object): state = StateNoop() - @cached_property - def jobs(self): - return cpu_count() * 4 + def adjust_jobs(self, jobs=None): + return jobs or self.JOBS def __init__(self, repo, config): self.repo = repo @@ -751,11 +751,6 @@ def changed_cache(self, checksum): return self._changed_dir_cache(checksum) return self.changed_cache_file(checksum) - def _adjust_jobs(self, jobs=None): - if not jobs: - jobs = self.jobs - return jobs - def cache_exists(self, checksums, jobs=None, name=None): """Check if the given checksums are stored in the remote. @@ -794,9 +789,7 @@ def exists_with_progress(path_info): pbar.update_desc(str(path_info)) return ret - with ThreadPoolExecutor( - max_workers=self._adjust_jobs(jobs) - ) as executor: + with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: path_infos = map(self.checksum_to_path_info, checksums) in_remote = executor.map(exists_with_progress, path_infos) ret = list(itertools.compress(checksums, in_remote)) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 40c4cd31be..c98a87f906 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -338,8 +338,7 @@ def _process( func = remote.upload status = STATUS_NEW - if jobs is None: - jobs = remote.jobs + jobs = remote.adjust_jobs(jobs) status_info = self.status( named_cache, diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 329b9a14f9..6366320fe3 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -277,6 +277,7 @@ def _upload(self, from_file, to_info, name=None, no_progress_bar=False): to_info.path, Callback=pbar.update, ExtraArgs=self.extra_args, + Config=self.transfer_config, ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): @@ -290,7 +291,11 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): disable=no_progress_bar, total=total, bytes=True, desc=name ) as pbar: self.s3.download_file( - from_info.bucket, from_info.path, to_file, Callback=pbar.update + from_info.bucket, + from_info.path, + to_file, + Callback=pbar.update, + Config=self.transfer_config, ) def _generate_download_url(self, path_info, expires=3600): @@ -337,19 +342,38 @@ def _append_aws_grants_to_extra_args(self, config): self.extra_args[extra_args_key] = config.get(grant_option) - def _adjust_jobs(self, jobs=None): - jobs = super(RemoteS3, self)._adjust_jobs(jobs) + @cached_property + def transfer_config(self): + from boto3.s3.transfer import TransferConfig + + return TransferConfig() + + def adjust_jobs(self, jobs=None): + jobs_declared = bool(jobs) + jobs = jobs or self.JOBS descriptor_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - estimated_descriptors_num = jobs * 20 - if estimated_descriptors_num <= descriptor_limit - 10: + threads_per_job = self.transfer_config.max_request_concurrency + fds_per_thread = 2 # file and socket + safety_margin = 10 + + estimated_descriptors_num = jobs * threads_per_job * fds_per_thread + if estimated_descriptors_num <= descriptor_limit - safety_margin: return jobs - jobs = (descriptor_limit - 10) // 20 - logger.warning( - "Parallelization reduced to '{}' jobs. Increase open " - "file descriptors limit to more than '{}' to prevent " - "the " - "reduction.".format(jobs, estimated_descriptors_num + 10) + safe_jobs_number = (descriptor_limit - safety_margin) // ( + threads_per_job * fds_per_thread ) + safe_descriptors_limit = estimated_descriptors_num + safety_margin + if jobs_declared: + logger.warning( + "Provided jobs number '{}' might result in 'Too many open " + "files error'. Consider decreasing jobs number to '{}' or " + "increasing " + "file descriptors limit to '{}'.".format( + jobs, safe_jobs_number, safe_descriptors_limit + ) + ) + else: + jobs = safe_jobs_number return jobs diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index 87440672eb..78da815e1a 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -40,6 +40,7 @@ class RemoteSSH(RemoteBASE): scheme = Schemes.SSH REQUIRES = {"paramiko": "paramiko"} + JOBS = 4 PARAM_CHECKSUM = "md5" DEFAULT_PORT = 22 TIMEOUT = 1800 @@ -50,10 +51,6 @@ class RemoteSSH(RemoteBASE): DEFAULT_CACHE_TYPES = ["copy"] - @property - def jobs(self): - return 4 - def __init__(self, repo, config): super(RemoteSSH, self).__init__(repo, config) url = config.get(Config.SECTION_REMOTE_URL) @@ -328,10 +325,9 @@ def cache_exists(self, checksums, jobs=None, name=None): def exists_with_progress(chunks): return self.batch_exists(chunks, callback=pbar.update_desc) - jobs = self._adjust_jobs(jobs) - with ThreadPoolExecutor(max_workers=jobs) as executor: + with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: path_infos = [self.checksum_to_path_info(x) for x in checksums] - chunks = to_chunks(path_infos, num_chunks=jobs) + chunks = to_chunks(path_infos, num_chunks=self.JOBS) results = executor.map(exists_with_progress, chunks) in_remote = itertools.chain.from_iterable(results) ret = list(itertools.compress(checksums, in_remote)) diff --git a/tests/unit/remote/test_s3.py b/tests/unit/remote/test_s3.py index 57fcfcec79..94bc0ddbfa 100644 --- a/tests/unit/remote/test_s3.py +++ b/tests/unit/remote/test_s3.py @@ -2,7 +2,7 @@ from dvc.config import ConfigError from dvc.remote.s3 import RemoteS3 - +from tests.utils import empty_caplog bucket_name = "bucket-name" prefix = "some/prefix" @@ -54,3 +54,34 @@ def test_grants_mutually_exclusive_acl_error(grants): with pytest.raises(ConfigError): RemoteS3(None, config) + + +@pytest.mark.parametrize( + "default_jobs_number,expected_result", [(10, 10), (13, 12)] +) +@patch("resource.getrlimit", return_value=(256, 1024)) +def test_adjust_default_jobs_number( + _, caplog, default_jobs_number, expected_result +): + remote = RemoteS3(None, {}) + + with empty_caplog(caplog), patch.object( + remote, "JOBS", default_jobs_number + ): + assert remote.adjust_jobs() == expected_result + + +@patch("resource.getrlimit", return_value=(256, 1024)) +def test_warn_on_too_many_jobs(_, caplog): + remote = RemoteS3(None, {}) + + with caplog.at_level(logging.INFO, "dvc"): + assert remote.adjust_jobs(64) == 64 + assert len(caplog.messages) == 1 + assert caplog.messages[0] == ( + "Provided jobs number '64' might result " + "in 'Too many open files error'. " + "Consider decreasing jobs number to '12' " + "or increasing file descriptors limit to " + "'1290'." + ) \ No newline at end of file diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index a40a91f2cf..d4569049a7 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -45,3 +45,14 @@ def trees_equal(dir_path_1, dir_path_2): def to_posixpath(path): return path.replace("\\", "/") + + +class empty_caplog(object): + def __init__(self, caplog): + self.caplog = caplog + + def __enter__(self): + self.caplog.clear() + + def __exit__(self, exc_type, exc_val, exc_tb): + assert self.caplog.text == "" From 710c4bd32e35fa193a52e7a9fb4bd3daaf645033 Mon Sep 17 00:00:00 2001 From: pawel Date: Thu, 5 Dec 2019 15:03:54 +0100 Subject: [PATCH 3/8] s3: test: mocker instead of mock annotations --- tests/unit/remote/test_s3.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/unit/remote/test_s3.py b/tests/unit/remote/test_s3.py index 94bc0ddbfa..d3f7325945 100644 --- a/tests/unit/remote/test_s3.py +++ b/tests/unit/remote/test_s3.py @@ -1,7 +1,8 @@ -import pytest +import logging -from dvc.config import ConfigError -from dvc.remote.s3 import RemoteS3 +import pytest +from mock import patch +from dvc.remote import RemoteS3 from tests.utils import empty_caplog bucket_name = "bucket-name" @@ -59,22 +60,23 @@ def test_grants_mutually_exclusive_acl_error(grants): @pytest.mark.parametrize( "default_jobs_number,expected_result", [(10, 10), (13, 12)] ) -@patch("resource.getrlimit", return_value=(256, 1024)) def test_adjust_default_jobs_number( - _, caplog, default_jobs_number, expected_result + mocker, caplog, default_jobs_number, expected_result ): remote = RemoteS3(None, {}) - with empty_caplog(caplog), patch.object( - remote, "JOBS", default_jobs_number - ): + mocker.patch("resource.getrlimit", return_value=(256, 1024)) + mocker.patch.object(remote, "JOBS", default_jobs_number) + + with empty_caplog(caplog): assert remote.adjust_jobs() == expected_result -@patch("resource.getrlimit", return_value=(256, 1024)) -def test_warn_on_too_many_jobs(_, caplog): +def test_warn_on_too_many_jobs(mocker, caplog): remote = RemoteS3(None, {}) + mocker.patch("resource.getrlimit", return_value=(256, 1024)) + with caplog.at_level(logging.INFO, "dvc"): assert remote.adjust_jobs(64) == 64 assert len(caplog.messages) == 1 From a0eda4c88484027d13a47f7a72448e1579605e3a Mon Sep 17 00:00:00 2001 From: pawel Date: Thu, 5 Dec 2019 16:57:26 +0100 Subject: [PATCH 4/8] remote: s3: reading ulimit/handles limit: support windows --- dvc/remote/s3.py | 10 ++++++++-- tests/unit/remote/test_s3.py | 11 +++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 6366320fe3..29c35c4419 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -3,7 +3,6 @@ import logging import os -import resource import threading from funcy import cached_property, wrap_prop @@ -352,7 +351,14 @@ def adjust_jobs(self, jobs=None): jobs_declared = bool(jobs) jobs = jobs or self.JOBS - descriptor_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + if os.name == "nt": + import win32file + + descriptor_limit = win32file._getmaxstdio() + else: + import resource + + descriptor_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] threads_per_job = self.transfer_config.max_request_concurrency fds_per_thread = 2 # file and socket safety_margin = 10 diff --git a/tests/unit/remote/test_s3.py b/tests/unit/remote/test_s3.py index d3f7325945..4b0e679fa4 100644 --- a/tests/unit/remote/test_s3.py +++ b/tests/unit/remote/test_s3.py @@ -1,4 +1,5 @@ import logging +import os import pytest from mock import patch @@ -65,7 +66,10 @@ def test_adjust_default_jobs_number( ): remote = RemoteS3(None, {}) - mocker.patch("resource.getrlimit", return_value=(256, 1024)) + if os.name == "nt": + mocker.patch("win32file._getmaxstdio", return_value=256) + else: + mocker.patch("resource.getrlimit", return_value=(256, 1024)) mocker.patch.object(remote, "JOBS", default_jobs_number) with empty_caplog(caplog): @@ -75,7 +79,10 @@ def test_adjust_default_jobs_number( def test_warn_on_too_many_jobs(mocker, caplog): remote = RemoteS3(None, {}) - mocker.patch("resource.getrlimit", return_value=(256, 1024)) + if os.name == "nt": + mocker.patch("win32file._getmaxstdio", return_value=256) + else: + mocker.patch("resource.getrlimit", return_value=(256, 1024)) with caplog.at_level(logging.INFO, "dvc"): assert remote.adjust_jobs(64) == 64 From d253f1319da03af6794cd788e2232aac084bcaa3 Mon Sep 17 00:00:00 2001 From: pawel Date: Fri, 13 Dec 2019 11:06:06 +0100 Subject: [PATCH 5/8] remote: upload/download: handle OSError --- dvc/exceptions.py | 9 +++++++++ dvc/remote/base.py | 36 +++++++++++++++++++++--------------- dvc/remote/s3.py | 18 ++++-------------- tests/unit/remote/test_s3.py | 1 - 4 files changed, 34 insertions(+), 30 deletions(-) diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 1beb8d6e1c..2b77f0d508 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -344,3 +344,12 @@ def __init__(self, path, external_repo_path, external_repo_url): class HTTPError(DvcException): def __init__(self, code, reason): super(HTTPError, self).__init__("'{} {}'".format(code, reason)) + + +class TooManyOpenFilesException(DvcException): + def __init__(self): + super(TooManyOpenFilesException, self).__init__( + "Operation failed due to too many open file descriptors. reduce " + "the number of jobs or increase open file descriptors limit to " + "prevent this." + ) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 81e22fa72d..a2324ba2c2 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -13,15 +13,11 @@ from shortuuid import uuid -from funcy import cached_property - import dvc.prompt as prompt from dvc.config import Config -from dvc.exceptions import ( - DvcException, - ConfirmRemoveError, - DvcIgnoreInCollectedDirError, -) +from dvc.exceptions import ConfirmRemoveError, TooManyOpenFilesException +from dvc.exceptions import DvcException +from dvc.exceptions import DvcIgnoreInCollectedDirError from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo from dvc.progress import Tqdm @@ -521,6 +517,16 @@ def _save(self, path_info, checksum): return self._save_file(path_info, checksum) + def _handle_transfer_exception( + self, from_info, to_info, exception, operation + ): + if isinstance(exception, OSError) and exception.errno == 24: + raise TooManyOpenFilesException() + + msg = "failed to {} '{}' to '{}'".format(operation, from_info, to_info) + logger.exception(msg) + return 1 + def upload(self, from_info, to_info, name=None, no_progress_bar=False): if not hasattr(self, "_upload"): raise RemoteActionNotImplemented("upload", self.scheme) @@ -542,10 +548,10 @@ def upload(self, from_info, to_info, name=None, no_progress_bar=False): name=name, no_progress_bar=no_progress_bar, ) - except Exception: - msg = "failed to upload '{}' to '{}'" - logger.exception(msg.format(from_info, to_info)) - return 1 # 1 fail + except Exception as e: + return self._handle_transfer_exception( + from_info, to_info, e, "upload" + ) return 0 @@ -619,10 +625,10 @@ def _download_file( self._download( from_info, tmp_file, name=name, no_progress_bar=no_progress_bar ) - except Exception: - msg = "failed to download '{}' to '{}'" - logger.exception(msg.format(from_info, to_info)) - return 1 # 1 fail + except Exception as e: + return self._handle_transfer_exception( + from_info, to_info, e, "download" + ) move(tmp_file, to_info, mode=file_mode) diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 29c35c4419..8d9fccb476 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -348,9 +348,6 @@ def transfer_config(self): return TransferConfig() def adjust_jobs(self, jobs=None): - jobs_declared = bool(jobs) - jobs = jobs or self.JOBS - if os.name == "nt": import win32file @@ -363,6 +360,9 @@ def adjust_jobs(self, jobs=None): fds_per_thread = 2 # file and socket safety_margin = 10 + jobs_declared = bool(jobs) + jobs = jobs or self.JOBS + estimated_descriptors_num = jobs * threads_per_job * fds_per_thread if estimated_descriptors_num <= descriptor_limit - safety_margin: return jobs @@ -370,16 +370,6 @@ def adjust_jobs(self, jobs=None): safe_jobs_number = (descriptor_limit - safety_margin) // ( threads_per_job * fds_per_thread ) - safe_descriptors_limit = estimated_descriptors_num + safety_margin - if jobs_declared: - logger.warning( - "Provided jobs number '{}' might result in 'Too many open " - "files error'. Consider decreasing jobs number to '{}' or " - "increasing " - "file descriptors limit to '{}'.".format( - jobs, safe_jobs_number, safe_descriptors_limit - ) - ) - else: + if not jobs_declared: jobs = safe_jobs_number return jobs diff --git a/tests/unit/remote/test_s3.py b/tests/unit/remote/test_s3.py index 4b0e679fa4..4537411580 100644 --- a/tests/unit/remote/test_s3.py +++ b/tests/unit/remote/test_s3.py @@ -2,7 +2,6 @@ import os import pytest -from mock import patch from dvc.remote import RemoteS3 from tests.utils import empty_caplog From 6ba4aba41964398b75977882e924dd4618f36276 Mon Sep 17 00:00:00 2001 From: pawel Date: Mon, 16 Dec 2019 08:23:53 +0100 Subject: [PATCH 6/8] remote: base: [down/up]load: raise on too many open files --- dvc/exceptions.py | 9 +++++---- dvc/remote/base.py | 11 +++++++---- tests/func/test_remote.py | 22 +++++++++++++++++++++- tests/unit/remote/test_s3.py | 23 ++--------------------- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 2b77f0d508..1373179f51 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -346,10 +346,11 @@ def __init__(self, code, reason): super(HTTPError, self).__init__("'{} {}'".format(code, reason)) -class TooManyOpenFilesException(DvcException): - def __init__(self): - super(TooManyOpenFilesException, self).__init__( +class TooManyOpenFilesError(DvcException): + def __init__(self, cause): + super(TooManyOpenFilesError, self).__init__( "Operation failed due to too many open file descriptors. reduce " "the number of jobs or increase open file descriptors limit to " - "prevent this." + "prevent this.", + cause=cause, ) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index a2324ba2c2..5f184e4f3e 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -15,9 +15,12 @@ import dvc.prompt as prompt from dvc.config import Config -from dvc.exceptions import ConfirmRemoveError, TooManyOpenFilesException -from dvc.exceptions import DvcException -from dvc.exceptions import DvcIgnoreInCollectedDirError +from dvc.exceptions import ( + DvcException, + ConfirmRemoveError, + DvcIgnoreInCollectedDirError, + TooManyOpenFilesError, +) from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo from dvc.progress import Tqdm @@ -521,7 +524,7 @@ def _handle_transfer_exception( self, from_info, to_info, exception, operation ): if isinstance(exception, OSError) and exception.errno == 24: - raise TooManyOpenFilesException() + raise TooManyOpenFilesError(exception) msg = "failed to {} '{}' to '{}'".format(operation, from_info, to_info) logger.exception(msg) diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index 23f318861b..cf41a5f261 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -2,13 +2,16 @@ import shutil import configobj +import pytest from mock import patch from dvc.config import Config +from dvc.exceptions import TooManyOpenFilesError from dvc.main import main from dvc.path_info import PathInfo -from dvc.remote import RemoteLOCAL +from dvc.remote import RemoteLOCAL, RemoteConfig from dvc.remote.base import RemoteBASE +from dvc.utils.compat import fspath from tests.basic_env import TestDvc from tests.remotes import get_local_url, get_local_storagepath @@ -253,3 +256,20 @@ def unreliable_upload(self, from_file, to_info, name=None, **kwargs): def get_last_exc(caplog): _, exc, _ = caplog.records[-2].exc_info return exc + + +def test_raise_on_too_many_open_files(tmp_dir, dvc, tmp_path_factory, mocker): + storage = tmp_path_factory.mktemp("test_remote_base") + remote_config = RemoteConfig(dvc.config) + remote_config.add("local_remote", fspath(storage), default=True) + + tmp_dir.dvc_gen({"file": "file content"}) + + too_many_open_files_error = OSError() + mocker.patch.object(too_many_open_files_error, "errno", 24) + mocker.patch.object( + RemoteLOCAL, "_upload", side_effect=too_many_open_files_error + ) + + with pytest.raises(TooManyOpenFilesError): + dvc.push() diff --git a/tests/unit/remote/test_s3.py b/tests/unit/remote/test_s3.py index 4537411580..ddcf5e7f80 100644 --- a/tests/unit/remote/test_s3.py +++ b/tests/unit/remote/test_s3.py @@ -1,7 +1,8 @@ -import logging import os import pytest + +from dvc.config import ConfigError from dvc.remote import RemoteS3 from tests.utils import empty_caplog @@ -73,23 +74,3 @@ def test_adjust_default_jobs_number( with empty_caplog(caplog): assert remote.adjust_jobs() == expected_result - - -def test_warn_on_too_many_jobs(mocker, caplog): - remote = RemoteS3(None, {}) - - if os.name == "nt": - mocker.patch("win32file._getmaxstdio", return_value=256) - else: - mocker.patch("resource.getrlimit", return_value=(256, 1024)) - - with caplog.at_level(logging.INFO, "dvc"): - assert remote.adjust_jobs(64) == 64 - assert len(caplog.messages) == 1 - assert caplog.messages[0] == ( - "Provided jobs number '64' might result " - "in 'Too many open files error'. " - "Consider decreasing jobs number to '12' " - "or increasing file descriptors limit to " - "'1290'." - ) \ No newline at end of file From 53163366b6ca19b5bd4cc3aee636d027156a9052 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Redzy=C5=84ski?= Date: Mon, 16 Dec 2019 18:03:34 +0100 Subject: [PATCH 7/8] remote: s3: revert jobs adjustment, pass OSError to main --- dvc/exceptions.py | 10 --------- dvc/main.py | 5 +++++ dvc/remote/base.py | 11 +++++----- dvc/remote/local.py | 3 ++- dvc/remote/s3.py | 40 +----------------------------------- tests/func/test_remote.py | 11 +++++----- tests/unit/remote/test_s3.py | 24 ++-------------------- tests/utils/__init__.py | 11 ---------- 8 files changed, 21 insertions(+), 94 deletions(-) diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 1373179f51..1beb8d6e1c 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -344,13 +344,3 @@ def __init__(self, path, external_repo_path, external_repo_url): class HTTPError(DvcException): def __init__(self, code, reason): super(HTTPError, self).__init__("'{} {}'".format(code, reason)) - - -class TooManyOpenFilesError(DvcException): - def __init__(self, cause): - super(TooManyOpenFilesError, self).__init__( - "Operation failed due to too many open file descriptors. reduce " - "the number of jobs or increase open file descriptors limit to " - "prevent this.", - cause=cause, - ) diff --git a/dvc/main.py b/dvc/main.py index 86e1a9e0df..0b4f95344a 100644 --- a/dvc/main.py +++ b/dvc/main.py @@ -1,6 +1,7 @@ """Main entry point for dvc CLI.""" from __future__ import unicode_literals +import errno import logging from dvc import analytics @@ -64,6 +65,10 @@ def main(argv=None): "unicode is not supported in DVC for Python 2 " "(end-of-life January 1, 2020), please upgrade to Python 3" ) + elif isinstance(exc, OSError) and exc.errno == errno.EMFILE: + logger.exception( + "too many open files error, please increase you `ulimit`" + ) else: logger.exception("unexpected error") ret = 255 diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 5f184e4f3e..57a1846a66 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -1,4 +1,7 @@ from __future__ import unicode_literals + +import errno + from dvc.utils.compat import basestring, FileNotFoundError, str, urlparse import itertools @@ -19,7 +22,6 @@ DvcException, ConfirmRemoveError, DvcIgnoreInCollectedDirError, - TooManyOpenFilesError, ) from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo @@ -85,9 +87,6 @@ class RemoteBASE(object): state = StateNoop() - def adjust_jobs(self, jobs=None): - return jobs or self.JOBS - def __init__(self, repo, config): self.repo = repo @@ -523,8 +522,8 @@ def _save(self, path_info, checksum): def _handle_transfer_exception( self, from_info, to_info, exception, operation ): - if isinstance(exception, OSError) and exception.errno == 24: - raise TooManyOpenFilesError(exception) + if isinstance(exception, OSError) and exception.errno == errno.EMFILE: + raise exception msg = "failed to {} '{}' to '{}'".format(operation, from_info, to_info) logger.exception(msg) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index c98a87f906..9c68e3a808 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -338,7 +338,8 @@ def _process( func = remote.upload status = STATUS_NEW - jobs = remote.adjust_jobs(jobs) + if jobs is None: + jobs = remote.JOBS status_info = self.status( named_cache, diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 8d9fccb476..6366fc426c 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -276,7 +276,6 @@ def _upload(self, from_file, to_info, name=None, no_progress_bar=False): to_info.path, Callback=pbar.update, ExtraArgs=self.extra_args, - Config=self.transfer_config, ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): @@ -290,11 +289,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): disable=no_progress_bar, total=total, bytes=True, desc=name ) as pbar: self.s3.download_file( - from_info.bucket, - from_info.path, - to_file, - Callback=pbar.update, - Config=self.transfer_config, + from_info.bucket, from_info.path, to_file, Callback=pbar.update ) def _generate_download_url(self, path_info, expires=3600): @@ -340,36 +335,3 @@ def _append_aws_grants_to_extra_args(self, config): ) self.extra_args[extra_args_key] = config.get(grant_option) - - @cached_property - def transfer_config(self): - from boto3.s3.transfer import TransferConfig - - return TransferConfig() - - def adjust_jobs(self, jobs=None): - if os.name == "nt": - import win32file - - descriptor_limit = win32file._getmaxstdio() - else: - import resource - - descriptor_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - threads_per_job = self.transfer_config.max_request_concurrency - fds_per_thread = 2 # file and socket - safety_margin = 10 - - jobs_declared = bool(jobs) - jobs = jobs or self.JOBS - - estimated_descriptors_num = jobs * threads_per_job * fds_per_thread - if estimated_descriptors_num <= descriptor_limit - safety_margin: - return jobs - - safe_jobs_number = (descriptor_limit - safety_margin) // ( - threads_per_job * fds_per_thread - ) - if not jobs_declared: - jobs = safe_jobs_number - return jobs diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index cf41a5f261..f3140b5f33 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -1,3 +1,4 @@ +import errno import os import shutil @@ -6,7 +7,6 @@ from mock import patch from dvc.config import Config -from dvc.exceptions import TooManyOpenFilesError from dvc.main import main from dvc.path_info import PathInfo from dvc.remote import RemoteLOCAL, RemoteConfig @@ -265,11 +265,12 @@ def test_raise_on_too_many_open_files(tmp_dir, dvc, tmp_path_factory, mocker): tmp_dir.dvc_gen({"file": "file content"}) - too_many_open_files_error = OSError() - mocker.patch.object(too_many_open_files_error, "errno", 24) mocker.patch.object( - RemoteLOCAL, "_upload", side_effect=too_many_open_files_error + RemoteLOCAL, + "_upload", + side_effect=OSError(errno.EMFILE, "Too many open files"), ) - with pytest.raises(TooManyOpenFilesError): + with pytest.raises(OSError) as e: dvc.push() + assert e.errno == errno.EMFILE diff --git a/tests/unit/remote/test_s3.py b/tests/unit/remote/test_s3.py index ddcf5e7f80..57fcfcec79 100644 --- a/tests/unit/remote/test_s3.py +++ b/tests/unit/remote/test_s3.py @@ -1,10 +1,8 @@ -import os - import pytest from dvc.config import ConfigError -from dvc.remote import RemoteS3 -from tests.utils import empty_caplog +from dvc.remote.s3 import RemoteS3 + bucket_name = "bucket-name" prefix = "some/prefix" @@ -56,21 +54,3 @@ def test_grants_mutually_exclusive_acl_error(grants): with pytest.raises(ConfigError): RemoteS3(None, config) - - -@pytest.mark.parametrize( - "default_jobs_number,expected_result", [(10, 10), (13, 12)] -) -def test_adjust_default_jobs_number( - mocker, caplog, default_jobs_number, expected_result -): - remote = RemoteS3(None, {}) - - if os.name == "nt": - mocker.patch("win32file._getmaxstdio", return_value=256) - else: - mocker.patch("resource.getrlimit", return_value=(256, 1024)) - mocker.patch.object(remote, "JOBS", default_jobs_number) - - with empty_caplog(caplog): - assert remote.adjust_jobs() == expected_result diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index d4569049a7..a40a91f2cf 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -45,14 +45,3 @@ def trees_equal(dir_path_1, dir_path_2): def to_posixpath(path): return path.replace("\\", "/") - - -class empty_caplog(object): - def __init__(self, caplog): - self.caplog = caplog - - def __enter__(self): - self.caplog.clear() - - def __exit__(self, exc_type, exc_val, exc_tb): - assert self.caplog.text == "" From ab548470f8032d37e759aab8a120208bb442e336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Redzy=C5=84ski?= Date: Tue, 17 Dec 2019 11:56:22 +0100 Subject: [PATCH 8/8] Update dvc/main.py Co-Authored-By: Ruslan Kuprieiev --- dvc/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/main.py b/dvc/main.py index 0b4f95344a..89c9dc775e 100644 --- a/dvc/main.py +++ b/dvc/main.py @@ -67,7 +67,7 @@ def main(argv=None): ) elif isinstance(exc, OSError) and exc.errno == errno.EMFILE: logger.exception( - "too many open files error, please increase you `ulimit`" + "too many open files, please increase your `ulimit`" ) else: logger.exception("unexpected error")