From 1511fd6dcc522c2cfbb33ff5daf08dff7b7a4617 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 20 Jul 2019 13:44:08 +0100 Subject: [PATCH 01/66] add preliminary tqdm work --- dvc/progress.py | 136 ++++++++++++++---------------------------------- setup.py | 1 + 2 files changed, 39 insertions(+), 98 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 31c882f52b..81d4ba6587 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -2,129 +2,68 @@ from __future__ import print_function from __future__ import unicode_literals +import logging +from tqdm import tqdm from dvc.utils.compat import str import sys import threading -CLEARLINE_PATTERN = "\r\x1b[K" - -class Progress(object): +class Progress(tqdm): """ Simple multi-target progress bar. """ - def __init__(self): - self._n_total = 0 - self._n_finished = 0 - self._lock = threading.Lock() - self._line = None + super(Progress, self).__init__( + total=0, + disable=logging.getLogger(__name__).getEffectiveLevel() >= logging.CRITICAL) + self.set_lock(threading.Lock()) + self._targets = {} + self.clearln() def set_n_total(self, total): """Sets total number of targets.""" - self._n_total = total - self._n_finished = 0 + self.reset(total) @property def is_finished(self): """Returns if all targets have finished.""" - return self._n_total == self._n_finished + return self.total == self.n def clearln(self): - self._print(CLEARLINE_PATTERN, end="") - - def _writeln(self, line): - self.clearln() - self._print(line, end="") - sys.stdout.flush() - - def reset(self): - with self._lock: - self._n_total = 0 - self._n_finished = 0 - self._line = None - - def refresh(self, line=None): - """Refreshes progress bar.""" - # Just go away if it is locked. Will update next time - if not self._lock.acquire(False): - return + self.display("") - if line is None: - line = self._line - - if sys.stdout.isatty() and line is not None: - self._writeln(line) - self._line = line - - self._lock.release() - - def update_target(self, name, current, total): + def update_target(self, name, current, total, auto_finish=False): """Updates progress bar for a specified target.""" - self.refresh(self._bar(name, current, total)) + if name in self._targets: + t = self._targets[name] + else: + with self.get_lock(): + # TODO: up to 10 nested bars + position = 1 + self._get_free_pos(self) % 10 + t = tqdm( + total=total, + desc=name, + leave=False, + position=position) + self._targets[name] = t + self.total += 1 + + t.update(current - t.n) + if auto_finish and t.n == t.total: + self.finish_target(name) def finish_target(self, name): """Finishes progress bar for a specified target.""" - # We have to write a msg about finished target - with self._lock: - pbar = self._bar(name, 100, 100) - - if sys.stdout.isatty(): - self.clearln() - - self._print(pbar) - - self._n_finished += 1 - self._line = None - - def _bar(self, target_name, current, total): - """ - Make a progress bar out of info, which looks like: - (1/2): [########################################] 100% master.zip - """ - bar_len = 30 - - if total is None: - state = 0 - percent = "?% " - else: - total = int(total) - state = int((100 * current) / total) if current < total else 100 - percent = str(state) + "% " - - if self._n_total > 1: - num = "({}/{}): ".format(self._n_finished + 1, self._n_total) - else: - num = "" - - n_sh = int((state * bar_len) / 100) - n_sp = bar_len - n_sh - pbar = "[" + "#" * n_sh + " " * n_sp + "] " - - return num + pbar + percent + target_name - - @staticmethod - def _print(*args, **kwargs): - import logging - - logger = logging.getLogger(__name__) - - if logger.getEffectiveLevel() == logging.CRITICAL: - return - - print(*args, **kwargs) - - def __enter__(self): - self._lock.acquire(True) - if self._line is not None: - self.clearln() - - def __exit__(self, typ, value, tbck): - if self._line is not None: - self.refresh() - self._lock.release() + t = self._targets.pop(name) + t.close() + # TODO: We have to write a msg about finished target + if self.total < 100: + # only if less that 100 items + print(t) + self.update() def __call__(self, seq, name="", total=None): if total is None: @@ -138,6 +77,7 @@ def __call__(self, seq, name="", total=None): class ProgressCallback(object): + # TODO: is this meant to be a thin wrapper for multi-progress, or just one bar? def __init__(self, total): self.total = total self.current = 0 diff --git a/setup.py b/setup.py index 9e98ace7ac..0e74366afa 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ def run(self): "funcy>=1.12", "pathspec>=0.5.9", "shortuuid>=0.5.0", + "tqdm>=4.32.1", "win-unicode-console>=0.5; sys_platform == 'win32'", ] From 0ebce9c1e1fc05bab73304c3d12ce0c640cd7dfd Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 21 Jul 2019 02:59:01 +0100 Subject: [PATCH 02/66] create dvc.progress.Tqdm, migrate dvc.remote.base --- dvc/progress.py | 41 +++++++++++++++++++++++++++++++++-------- dvc/remote/base.py | 24 +++++------------------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 81d4ba6587..1f4d029656 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -1,25 +1,50 @@ """Manages progress bars for dvc repo.""" - from __future__ import print_function -from __future__ import unicode_literals import logging from tqdm import tqdm +from threading import Lock -from dvc.utils.compat import str -import sys -import threading +class Tqdm(tqdm): + """ + maximum-compatibility tqdm-based progressbars + """ + def __init__( + self, + iterable=None, + disable=logging.getLogger( + __name__).getEffectiveLevel() >= logging.CRITICAL, + ascii=None, # TODO: True? + **kwargs): + """ + kwargs : anything accepted by `tqdm.tqdm()` + """ + super(Tqdm, self).__init__( + iterable=iterable, + disable=disable, + ascii=ascii, + **kwargs) + # self.set_lock(Lock()) + + def update_desc(self, desc, n=1): + """ + Calls `set_description(desc)` and `update(n)` + """ + self.set_description(desc, refresh=False) + self.update(n) class Progress(tqdm): """ Simple multi-target progress bar. + TODO: remove this class. """ def __init__(self): super(Progress, self).__init__( total=0, - disable=logging.getLogger(__name__).getEffectiveLevel() >= logging.CRITICAL) - self.set_lock(threading.Lock()) + disable=logging.getLogger( + __name__).getEffectiveLevel() >= logging.CRITICAL) + self.set_lock(Lock()) self._targets = {} self.clearln() @@ -77,7 +102,7 @@ def __call__(self, seq, name="", total=None): class ProgressCallback(object): - # TODO: is this meant to be a thin wrapper for multi-progress, or just one bar? + # TODO: remove this thin wrapper def __init__(self, total): self.total = total self.current = 0 diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 9a387774af..224744d522 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -19,7 +19,7 @@ ConfirmRemoveError, DvcIgnoreInCollectedDirError, ) -from dvc.progress import progress, ProgressCallback +from dvc.progress import Tqdm from dvc.utils import ( LARGE_DIR_SIZE, tmp_fname, @@ -159,7 +159,7 @@ def _calculate_checksums(self, file_infos): "This is only done once." ) logger.info(msg) - tasks = progress(tasks, total=len(file_infos)) + tasks = Tqdm(tasks, total=len(file_infos), unit="md5") checksums = { file_infos[index]: task for index, task in enumerate(tasks) @@ -444,9 +444,6 @@ def upload(self, from_info, to_info, name=None, no_progress_bar=False): name = name or from_info.name - if not no_progress_bar: - progress.update_target(name, 0, None) - try: self._upload( from_info.fspath, @@ -459,9 +456,6 @@ def upload(self, from_info, to_info, name=None, no_progress_bar=False): logger.exception(msg.format(from_info, to_info)) return 1 # 1 fail - if not no_progress_bar: - progress.finish_target(name) - return 0 def download( @@ -490,11 +484,6 @@ def download( name = name or to_info.name - if not no_progress_bar: - # real progress is not always available, - # lets at least show start and finish - progress.update_target(name, 0, None) - makedirs(to_info.parent, exist_ok=True, mode=dir_mode) tmp_file = tmp_fname(to_info) @@ -509,9 +498,6 @@ def download( move(tmp_file, to_info, mode=file_mode) - if not no_progress_bar: - progress.finish_target(name) - return 0 def open(self, path_info, mode="r", encoding=None): @@ -642,10 +628,10 @@ def cache_exists(self, checksums, jobs=None): Returns: A list with checksums that were found in the remote """ - progress_callback = ProgressCallback(len(checksums)) + pbar = Tqdm(total=len(checksums)) def exists_with_progress(chunks): - return self.batch_exists(chunks, callback=progress_callback) + return self.batch_exists(chunks, callback=pbar.update_desc) if self.no_traverse and hasattr(self, "batch_exists"): with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: @@ -654,7 +640,7 @@ def exists_with_progress(chunks): results = executor.map(exists_with_progress, chunks) in_remote = itertools.chain.from_iterable(results) ret = list(itertools.compress(checksums, in_remote)) - progress_callback.finish("") + pbar.close() return ret return list(set(checksums) & set(self.all())) From 70ee31329f16564cbbf07982393971d23313766f Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 27 Jul 2019 16:43:30 +0100 Subject: [PATCH 03/66] disable old progress --- dvc/logger.py | 11 ----------- dvc/progress.py | 38 +++++++++++-------------------------- tests/unit/test_progress.py | 2 +- 3 files changed, 12 insertions(+), 39 deletions(-) diff --git a/dvc/logger.py b/dvc/logger.py index 45432eed4b..59ac3e3f37 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -53,9 +53,6 @@ class ColorFormatter(logging.Formatter): ) def format(self, record): - if self._is_visible(record): - self._progress_aware() - if record.levelname == "INFO": return record.msg @@ -146,14 +143,6 @@ def _parse_exc(self, exc_info): return (exception, stack_trace) - def _progress_aware(self): - """Add a new line if progress bar hasn't finished""" - from dvc.progress import progress - - if not progress.is_finished: - progress._print() - progress.clearln() - class LoggerHandler(logging.StreamHandler): def handleError(self, record): diff --git a/dvc/progress.py b/dvc/progress.py index 1f4d029656..b5ccdeea09 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -40,10 +40,7 @@ class Progress(tqdm): TODO: remove this class. """ def __init__(self): - super(Progress, self).__init__( - total=0, - disable=logging.getLogger( - __name__).getEffectiveLevel() >= logging.CRITICAL) + super(Progress, self).__init__(total=0, disable=True) self.set_lock(Lock()) self._targets = {} self.clearln() @@ -58,39 +55,26 @@ def is_finished(self): return self.total == self.n def clearln(self): - self.display("") + pass def update_target(self, name, current, total, auto_finish=False): """Updates progress bar for a specified target.""" - if name in self._targets: - t = self._targets[name] - else: - with self.get_lock(): - # TODO: up to 10 nested bars - position = 1 + self._get_free_pos(self) % 10 - t = tqdm( - total=total, - desc=name, - leave=False, - position=position) - self._targets[name] = t - self.total += 1 - - t.update(current - t.n) - if auto_finish and t.n == t.total: + if total and self.total != total: + self.set_n_total(total) + self.set_postfix_str(name, refresh=False) + self.update(current - self.n) + if auto_finish and self.is_finished: self.finish_target(name) def finish_target(self, name): """Finishes progress bar for a specified target.""" - t = self._targets.pop(name) - t.close() # TODO: We have to write a msg about finished target - if self.total < 100: - # only if less that 100 items - print(t) - self.update() + self.set_postfix_str(name, refresh=False) + self.clearln() def __call__(self, seq, name="", total=None): + logger = logging.getLogger(__name__) + logger.warning("DeprecationWarning: create Tqdm() instance instead") if total is None: total = len(seq) diff --git a/tests/unit/test_progress.py b/tests/unit/test_progress.py index e617c1775b..802bcb5fbe 100644 --- a/tests/unit/test_progress.py +++ b/tests/unit/test_progress.py @@ -5,7 +5,7 @@ def test_quiet(caplog, capsys): with caplog.at_level(logging.CRITICAL, logger="dvc"): - progress._print("something") + progress.clearln() assert capsys.readouterr().out == "" From c4b9d7dba7fddcbd2c08825dd96ced01c92e8ddd Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 27 Jul 2019 17:29:15 +0100 Subject: [PATCH 04/66] minor bugfix --- dvc/progress.py | 2 ++ dvc/remote/http.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dvc/progress.py b/dvc/progress.py index b5ccdeea09..5fe0f60b92 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -41,6 +41,8 @@ class Progress(tqdm): """ def __init__(self): super(Progress, self).__init__(total=0, disable=True) + from time import time + self._time = time self.set_lock(Lock()) self._targets = {} self.clearln() diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 715465293a..f39c5a3e38 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -70,7 +70,7 @@ def batch_exists(self, path_infos, callback): for path_info in path_infos: results.append(self.exists(path_info)) - callback.update(str(path_info)) + callback(str(path_info)) return results From cc281551eb3e10451c2b7f08d083a9c8dcf0b939 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 14:50:39 +0100 Subject: [PATCH 05/66] tqdm repo.checkout --- dvc/repo/checkout.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 2d743e41d5..826a24a818 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -3,7 +3,7 @@ import logging from dvc.exceptions import CheckoutErrorSuggestGit -from dvc.progress import ProgressCallback +from dvc.progress import Tqdm logger = logging.getLogger(__name__) @@ -23,13 +23,6 @@ def get_all_files_numbers(stages): return sum(stage.get_all_files_number() for stage in stages) -def get_progress_callback(stages): - total_files_num = get_all_files_numbers(stages) - if total_files_num == 0: - return None - return ProgressCallback(total_files_num) - - def checkout(self, target=None, with_deps=False, force=False, recursive=False): from dvc.stage import StageFileDoesNotExistError, StageFileBadNameError @@ -44,7 +37,7 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): with self.state: _cleanup_unused_links(self, all_stages) - progress_callback = get_progress_callback(stages) + pbar = Tqdm(total=get_all_files_numbers(stages)) for stage in stages: if stage.locked: @@ -53,6 +46,5 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): " not going to be checked out.".format(path=stage.relpath) ) - stage.checkout(force=force, progress_callback=progress_callback) - if progress_callback: - progress_callback.finish("Checkout finished!") + stage.checkout(force=force, progress_callback=pbar.update_desc) + pbar.close() From 62cbeed8b38bc52bf72a3b42b77c67e6fae1a5a7 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 14:53:02 +0100 Subject: [PATCH 06/66] tqdm remote.http --- dvc/remote/http.py | 51 ++++++++++++++++------------------------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/dvc/remote/http.py b/dvc/remote/http.py index f39c5a3e38..911e9754c8 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -1,35 +1,20 @@ from __future__ import unicode_literals from dvc.scheme import Schemes - from dvc.utils.compat import open import threading import requests import logging -from dvc.progress import progress +from dvc.progress import Tqdm from dvc.exceptions import DvcException from dvc.config import Config from dvc.remote.base import RemoteBASE - logger = logging.getLogger(__name__) -class ProgressBarCallback(object): - def __init__(self, name, total): - self.name = name - self.total = total - self.current = 0 - self.lock = threading.Lock() - - def __call__(self, byts): - with self.lock: - self.current += byts - progress.update_target(self.name, self.current, self.total) - - class RemoteHTTP(RemoteBASE): scheme = Schemes.HTTP REQUEST_TIMEOUT = 10 @@ -43,24 +28,21 @@ def __init__(self, repo, config): self.path_info = self.path_cls(url) if url else None def _download(self, from_info, to_file, name=None, no_progress_bar=False): - callback = None - if not no_progress_bar: - total = self._content_length(from_info.url) - if total: - callback = ProgressBarCallback(name, total) - request = self._request("GET", from_info.url, stream=True) - - with open(to_file, "wb") as fd: - transferred_bytes = 0 - - for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): - fd.write(chunk) - fd.flush() - transferred_bytes += len(chunk) - - if callback: - callback(transferred_bytes) + total = self._content_length(from_info.url) + leave = False + #TODO: persistent progress only for "large" files? + #if total: + # leave = total > self.CHUNK_SIZE * 100 + + with Tqdm(total=total, leave=leave, + unit='B', unit_scale=True, unit_divisor=1024, miniters=1, + disable=no_progress_bar) as pbar: + with open(to_file, "wb") as fd: + for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): + fd.write(chunk) + fd.flush() + pbar.update(len(chunk)) def exists(self, path_info): return bool(self._request("HEAD", path_info.url)) @@ -75,7 +57,8 @@ def batch_exists(self, path_infos, callback): return results def _content_length(self, url): - return self._request("HEAD", url).headers.get("Content-Length") + res = self._request("HEAD", url).headers.get("Content-Length") + return int(res) if res else None def get_file_checksum(self, path_info): url = path_info.url From 6aee6b6b16440d26ad031000f96bd535873c1e32 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 14:54:37 +0100 Subject: [PATCH 07/66] add progress.Tqdm(bytes) --- dvc/progress.py | 7 +++++++ dvc/remote/http.py | 3 +-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 5fe0f60b92..3c483cceec 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -15,10 +15,17 @@ def __init__( disable=logging.getLogger( __name__).getEffectiveLevel() >= logging.CRITICAL, ascii=None, # TODO: True? + bytes=False, **kwargs): """ + bytes : adds unit='B', unit_scale=True, unit_divisor=1024, miniters=1 kwargs : anything accepted by `tqdm.tqdm()` """ + if bytes: + #kwargs = deepcopy(kwargs) + for k, v in dict(unit='B', unit_scale=True, unit_divisor=1024, + miniters=1): + kwargs.setdefault(k, v) super(Tqdm, self).__init__( iterable=iterable, disable=disable, diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 911e9754c8..e662b9b9cc 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -35,8 +35,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): #if total: # leave = total > self.CHUNK_SIZE * 100 - with Tqdm(total=total, leave=leave, - unit='B', unit_scale=True, unit_divisor=1024, miniters=1, + with Tqdm(total=total, leave=leave, bytes=True, disable=no_progress_bar) as pbar: with open(to_file, "wb") as fd: for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): From 3f0b85cf1533095e97c0136f00a867246fe23119 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 14:54:46 +0100 Subject: [PATCH 08/66] add progress.Tqdm(truncate) --- dvc/progress.py | 13 +++++++++++++ dvc/remote/http.py | 1 + 2 files changed, 14 insertions(+) diff --git a/dvc/progress.py b/dvc/progress.py index 3c483cceec..492692ced6 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -40,6 +40,19 @@ def update_desc(self, desc, n=1): self.set_description(desc, refresh=False) self.update(n) + @classmethod + def truncate(cls, s, max_len=10, end=True, fill="..."): + """ + Guarantee len(output) < max_lenself. + >>> truncate("hello", 4) + '...o' + """ + if len(s) <= max_len: + return s + if len(fill) > max_len: + return fill[-max_len:] if end else fill[:max_len] + i = max_len - len(fill) + return (fill + s[-i:]) if end else (s[:i] + fill) class Progress(tqdm): """ diff --git a/dvc/remote/http.py b/dvc/remote/http.py index e662b9b9cc..15747e679b 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -36,6 +36,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): # leave = total > self.CHUNK_SIZE * 100 with Tqdm(total=total, leave=leave, bytes=True, + desc=Tqdm.truncate(to_file, 10), disable=no_progress_bar) as pbar: with open(to_file, "wb") as fd: for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): From d460d7e668d10014c4bc8ccefdba5851af14be23 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:08:08 +0100 Subject: [PATCH 09/66] tqdm remote.s3, flake8 --- dvc/progress.py | 3 ++- dvc/remote/http.py | 7 +++--- dvc/remote/s3.py | 54 +++++++++++++++++----------------------------- 3 files changed, 25 insertions(+), 39 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 492692ced6..051ad855ba 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -22,7 +22,7 @@ def __init__( kwargs : anything accepted by `tqdm.tqdm()` """ if bytes: - #kwargs = deepcopy(kwargs) + # kwargs = deepcopy(kwargs) for k, v in dict(unit='B', unit_scale=True, unit_divisor=1024, miniters=1): kwargs.setdefault(k, v) @@ -54,6 +54,7 @@ def truncate(cls, s, max_len=10, end=True, fill="..."): i = max_len - len(fill) return (fill + s[-i:]) if end else (s[:i] + fill) + class Progress(tqdm): """ Simple multi-target progress bar. diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 15747e679b..bd7bfdfd61 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -3,7 +3,6 @@ from dvc.scheme import Schemes from dvc.utils.compat import open -import threading import requests import logging @@ -31,9 +30,9 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): request = self._request("GET", from_info.url, stream=True) total = self._content_length(from_info.url) leave = False - #TODO: persistent progress only for "large" files? - #if total: - # leave = total > self.CHUNK_SIZE * 100 + # TODO: persistent progress only for "large" files? + # if total: + # leave = total > self.CHUNK_SIZE * 100 with Tqdm(total=total, leave=leave, bytes=True, desc=Tqdm.truncate(to_file, 10), diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index d9adab21b4..344af86584 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals import os -import threading import logging import itertools from funcy import cached_property @@ -11,7 +10,7 @@ except ImportError: boto3 = None -from dvc.progress import progress +from dvc.progress import Tqdm from dvc.config import Config from dvc.remote.base import RemoteBASE from dvc.exceptions import DvcException, ETagMismatchError @@ -21,19 +20,6 @@ logger = logging.getLogger(__name__) -class Callback(object): - def __init__(self, name, total): - self.name = name - self.total = total - self.current = 0 - self.lock = threading.Lock() - - def __call__(self, byts): - with self.lock: - self.current += byts - progress.update_target(self.name, self.current, self.total) - - class RemoteS3(RemoteBASE): scheme = Schemes.S3 path_cls = CloudURLInfo @@ -228,24 +214,24 @@ def batch_exists(self, path_infos, callback): def _upload(self, from_file, to_info, name=None, no_progress_bar=False): total = os.path.getsize(from_file) - cb = None if no_progress_bar else Callback(name, total) - self.s3.upload_file( - from_file, - to_info.bucket, - to_info.path, - Callback=cb, - ExtraArgs=self.extra_args, - ) + desc = Tqdm.truncate(name) if name else None + with Tqdm(disable=no_progress_bar, total=total, bytes=True, + desc=desc) as pbar: + self.s3.upload_file( + from_file, + to_info.bucket, + to_info.path, + Callback=pbar.update, + ExtraArgs=self.extra_args, + ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): - if no_progress_bar: - cb = None - else: - total = self.s3.head_object( - Bucket=from_info.bucket, Key=from_info.path - )["ContentLength"] - cb = Callback(name, total) - - self.s3.download_file( - from_info.bucket, from_info.path, to_file, Callback=cb - ) + total = self.s3.head_object( + Bucket=from_info.bucket, Key=from_info.path + )["ContentLength"] + desc = Tqdm.truncate(name) if name else None + with Tqdm(disable=no_progress_bar, total=total, bytes=True, + desc=desc) as pbar: + self.s3.download_file( + from_info.bucket, from_info.path, to_file, Callback=pbar.update + ) From f61e7d86a99b09a7bf7ef2d84d652b57b538e286 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:19:17 +0100 Subject: [PATCH 10/66] tqdm remote.azure --- dvc/progress.py | 5 +++++ dvc/remote/azure.py | 30 +++++++++++++----------------- dvc/remote/http.py | 2 +- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 051ad855ba..4d690b2a55 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -40,6 +40,11 @@ def update_desc(self, desc, n=1): self.set_description(desc, refresh=False) self.update(n) + def update_to(self, current, total=None): + if total: + self.total = total + self.update(current - self.n) + @classmethod def truncate(cls, s, max_len=10, end=True, fill="..."): """ diff --git a/dvc/remote/azure.py b/dvc/remote/azure.py index 0b8330ae3c..af756aae64 100644 --- a/dvc/remote/azure.py +++ b/dvc/remote/azure.py @@ -15,7 +15,7 @@ BlockBlobService = None from dvc.utils.compat import urlparse -from dvc.progress import progress +from dvc.progress import Tqdm from dvc.config import Config from dvc.remote.base import RemoteBASE from dvc.path_info import CloudURLInfo @@ -25,14 +25,6 @@ logger = logging.getLogger(__name__) -class Callback(object): - def __init__(self, name): - self.name = name - - def __call__(self, current, total): - progress.update_target(self.name, current, total) - - class RemoteAZURE(RemoteBASE): scheme = Schemes.AZURE path_cls = CloudURLInfo @@ -124,18 +116,22 @@ def list_cache_paths(self): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - cb = None if no_progress_bar else Callback(name) - self.blob_service.create_blob_from_path( - to_info.bucket, to_info.path, from_file, progress_callback=cb - ) + desc = Tqdm.truncate(name, 10) if name else None + with Tqdm(desc=desc, disable=no_progress_bar) as pbar: + self.blob_service.create_blob_from_path( + to_info.bucket, to_info.path, from_file, + progress_callback=pbar.update_to + ) def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): - cb = None if no_progress_bar else Callback(name) - self.blob_service.get_blob_to_path( - from_info.bucket, from_info.path, to_file, progress_callback=cb - ) + desc = Tqdm.truncate(name, 10) if name else None + with Tqdm(desc=desc, disable=no_progress_bar) as pbar: + self.blob_service.get_blob_to_path( + from_info.bucket, from_info.path, to_file, + progress_callback=pbar.update_to + ) def open(self, path_info, mode="r", encoding=None): get_url = lambda: self._generate_download_url(path_info) # noqa: E731 diff --git a/dvc/remote/http.py b/dvc/remote/http.py index bd7bfdfd61..d125f52732 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -35,7 +35,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): # leave = total > self.CHUNK_SIZE * 100 with Tqdm(total=total, leave=leave, bytes=True, - desc=Tqdm.truncate(to_file, 10), + desc=Tqdm.truncate(to_file), disable=no_progress_bar) as pbar: with open(to_file, "wb") as fd: for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): From 85d4f7ebb244aec8991a69db0172f976a9174213 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:26:43 +0100 Subject: [PATCH 11/66] add progress.Tqdm(desc_truncate) --- dvc/progress.py | 9 +++++++-- dvc/remote/azure.py | 6 ++---- dvc/remote/http.py | 2 +- dvc/remote/s3.py | 6 ++---- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 4d690b2a55..f2eedda121 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -16,16 +16,21 @@ def __init__( __name__).getEffectiveLevel() >= logging.CRITICAL, ascii=None, # TODO: True? bytes=False, + desc_truncate=None, **kwargs): """ - bytes : adds unit='B', unit_scale=True, unit_divisor=1024, miniters=1 + bytes : shortcut for + `unit='B', unit_scale=True, unit_divisor=1024, miniters=1` + desc_truncate : like `desc` but will truncate to 10 chars kwargs : anything accepted by `tqdm.tqdm()` """ + # kwargs = deepcopy(kwargs) if bytes: - # kwargs = deepcopy(kwargs) for k, v in dict(unit='B', unit_scale=True, unit_divisor=1024, miniters=1): kwargs.setdefault(k, v) + if desc_truncate is not None: + kwargs.setdefault('desc', self.truncate(desc_truncate)) super(Tqdm, self).__init__( iterable=iterable, disable=disable, diff --git a/dvc/remote/azure.py b/dvc/remote/azure.py index af756aae64..5af5c31bc5 100644 --- a/dvc/remote/azure.py +++ b/dvc/remote/azure.py @@ -116,8 +116,7 @@ def list_cache_paths(self): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - desc = Tqdm.truncate(name, 10) if name else None - with Tqdm(desc=desc, disable=no_progress_bar) as pbar: + with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: self.blob_service.create_blob_from_path( to_info.bucket, to_info.path, from_file, progress_callback=pbar.update_to @@ -126,8 +125,7 @@ def _upload( def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): - desc = Tqdm.truncate(name, 10) if name else None - with Tqdm(desc=desc, disable=no_progress_bar) as pbar: + with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: self.blob_service.get_blob_to_path( from_info.bucket, from_info.path, to_file, progress_callback=pbar.update_to diff --git a/dvc/remote/http.py b/dvc/remote/http.py index d125f52732..fe778cf7be 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -35,7 +35,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): # leave = total > self.CHUNK_SIZE * 100 with Tqdm(total=total, leave=leave, bytes=True, - desc=Tqdm.truncate(to_file), + desc_truncate=to_file, disable=no_progress_bar) as pbar: with open(to_file, "wb") as fd: for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 344af86584..382aba9ae5 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -214,9 +214,8 @@ def batch_exists(self, path_infos, callback): def _upload(self, from_file, to_info, name=None, no_progress_bar=False): total = os.path.getsize(from_file) - desc = Tqdm.truncate(name) if name else None with Tqdm(disable=no_progress_bar, total=total, bytes=True, - desc=desc) as pbar: + desc_truncate=name) as pbar: self.s3.upload_file( from_file, to_info.bucket, @@ -229,9 +228,8 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): total = self.s3.head_object( Bucket=from_info.bucket, Key=from_info.path )["ContentLength"] - desc = Tqdm.truncate(name) if name else None with Tqdm(disable=no_progress_bar, total=total, bytes=True, - desc=desc) as pbar: + desc_truncate=name) as pbar: self.s3.download_file( from_info.bucket, from_info.path, to_file, Callback=pbar.update ) From bac3d9343b2d1482a31699912a228301f38ae15f Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:34:28 +0100 Subject: [PATCH 12/66] tqdm remote.ssh.connection --- dvc/remote/ssh/connection.py | 48 ++++++++++++------------------------ 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/dvc/remote/ssh/connection.py b/dvc/remote/ssh/connection.py index 8362857b8c..1a0e9fb2e7 100644 --- a/dvc/remote/ssh/connection.py +++ b/dvc/remote/ssh/connection.py @@ -12,7 +12,7 @@ from dvc.utils import tmp_fname from dvc.utils.compat import ignore_file_not_found -from dvc.progress import progress +from dvc.progress import Tqdm from dvc.exceptions import DvcException from dvc.remote.base import RemoteCmdError @@ -29,21 +29,6 @@ def sizeof_fmt(num, suffix="B"): return "%.1f%s%s" % (num, "Y", suffix) -def percent_cb(name, complete, total): - """ Callback for updating target progress """ - logger.debug( - "{}: {} transferred out of {}".format( - name, sizeof_fmt(complete), sizeof_fmt(total) - ) - ) - progress.update_target(name, complete, total) - - -def create_cb(name): - """ Create callback function for multipart object """ - return lambda cur, tot: percent_cb(name, cur, tot) - - class SSHConnection: def __init__(self, host, *args, **kwargs): logger.debug( @@ -183,14 +168,14 @@ def remove(self, path): self._remove_file(path) def download(self, src, dest, no_progress_bar=False, progress_title=None): - if no_progress_bar: - self.sftp.get(src, dest) - else: - if not progress_title: - progress_title = os.path.basename(src) + if not progress_title: + progress_title = os.path.basename(src) - self.sftp.get(src, dest, callback=create_cb(progress_title)) - progress.finish_target(progress_title) + with Tqdm( + desc_truncate=progress_title, + disable=no_progress_bar + ) as pbar: + self.sftp.get(src, dest, callback=pbar.update_to) def move(self, src, dst): self.makedirs(posixpath.dirname(dst)) @@ -199,15 +184,14 @@ def move(self, src, dst): def upload(self, src, dest, no_progress_bar=False, progress_title=None): self.makedirs(posixpath.dirname(dest)) tmp_file = tmp_fname(dest) - - if no_progress_bar: - self.sftp.put(src, tmp_file) - else: - if not progress_title: - progress_title = posixpath.basename(dest) - - self.sftp.put(src, tmp_file, callback=create_cb(progress_title)) - progress.finish_target(progress_title) + if not progress_title: + progress_title = posixpath.basename(dest) + + with Tqdm( + desc_truncate=progress_title, + disable=no_progress_bar + ) as pbar: + self.sftp.put(src, tmp_file, callback=pbar.update_to) self.sftp.rename(tmp_file, dest) From efd8a070ff796260273ff257255f828be6a90799 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:39:35 +0100 Subject: [PATCH 13/66] tqdm remote.local --- dvc/remote/local/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 816cdaa2e6..9de7c445ba 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -35,7 +35,7 @@ ) from dvc.config import Config from dvc.exceptions import DvcException -from dvc.progress import progress +from dvc.progress import Tqdm from concurrent.futures import ThreadPoolExecutor from dvc.path_info import PathInfo @@ -255,7 +255,7 @@ def move(self, from_info, to_info): def cache_exists(self, checksums, jobs=None): return [ checksum - for checksum in progress(checksums) + for checksum in Tqdm(checksums, unit='md5') if not self.changed_cache_file(checksum) ] @@ -348,8 +348,8 @@ def _get_plans(self, download, remote, status_info, status): cache = [] path_infos = [] names = [] - for md5, info in progress( - status_info.items(), name="Analysing status" + for md5, info in Tqdm( + status_info.items(), desc="Analysing status" ): if info["status"] == status: cache.append(self.checksum_to_path_info(md5)) @@ -542,7 +542,7 @@ def _update_unpacked_dir(self, checksum): def _create_unpacked_dir(self, checksum, dir_info, unpacked_dir_info): self.makedirs(unpacked_dir_info) - for entry in progress(dir_info, name="Created unpacked dir"): + for entry in Tqdm(dir_info, desc="Created unpacked dir"): entry_cache_info = self.checksum_to_path_info( entry[self.PARAM_CHECKSUM] ) From 82bb550e7dfa178545b9e91b8de8d9f3c5c065dc Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:54:54 +0100 Subject: [PATCH 14/66] tqdm dvc.utils --- dvc/utils/__init__.py | 73 ++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index af7356575c..3a247be82f 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -33,8 +33,8 @@ logger = logging.getLogger(__name__) -LOCAL_CHUNK_SIZE = 1024 * 1024 -LARGE_FILE_SIZE = 1024 * 1024 * 1024 +LOCAL_CHUNK_SIZE = 2 ** 20 # 1 MB +LARGE_FILE_SIZE = 2 ** 30 # 1 GB LARGE_DIR_SIZE = 100 @@ -44,7 +44,7 @@ def dos2unix(data): def file_md5(fname): """ get the (md5 hexdigest, md5 digest) of a file """ - from dvc.progress import progress + from dvc.progress import Tqdm from dvc.istextfile import istextfile if os.path.exists(fname): @@ -56,28 +56,25 @@ def file_md5(fname): bar = True msg = "Computing md5 for a large file {}. This is only done once." logger.info(msg.format(relpath(fname))) - name = relpath(fname) - total = 0 - - with open(fname, "rb") as fobj: - while True: - data = fobj.read(LOCAL_CHUNK_SIZE) - if not data: - break - - if bar: - total += len(data) - progress.update_target(name, total, size) - - if binary: - chunk = data - else: - chunk = dos2unix(data) - - hash_md5.update(chunk) - - if bar: - progress.finish_target(name) + name = relpath(fname) + + with Tqdm( + desc_truncate=name, disable=not bar, total=size, bytes=True, + leave=False + ) as pbar: + with open(fname, "rb") as fobj: + while True: + data = fobj.read(LOCAL_CHUNK_SIZE) + if not data: + break + + if binary: + chunk = data + else: + chunk = dos2unix(data) + + hash_md5.update(chunk) + pbar.update(len(data)) return (hash_md5.hexdigest(), hash_md5.digest()) @@ -119,10 +116,9 @@ def dict_md5(d, exclude=()): def copyfile(src, dest, no_progress_bar=False, name=None): """Copy file with progress bar""" from dvc.exceptions import DvcException - from dvc.progress import progress + from dvc.progress import Tqdm from dvc.system import System - copied = 0 name = name if name else os.path.basename(dest) total = os.stat(src).st_size @@ -132,18 +128,17 @@ def copyfile(src, dest, no_progress_bar=False, name=None): try: System.reflink(src, dest) except DvcException: - with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: - while True: - buf = fsrc.read(LOCAL_CHUNK_SIZE) - if not buf: - break - fdest.write(buf) - copied += len(buf) - if not no_progress_bar: - progress.update_target(name, copied, total) - - if not no_progress_bar: - progress.finish_target(name) + with Tqdm( + desc_truncate=name, disable=no_progress_bar, total=total, + bytes=True, + ) as pbar: + with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: + while True: + buf = fsrc.read(LOCAL_CHUNK_SIZE) + if not buf: + break + fdest.write(buf) + pbar.update(len(buf)) def makedirs(path, exist_ok=False, mode=None): From d6f816bc48e5131df7cd04ff256ed9526fd598d8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 15:55:43 +0100 Subject: [PATCH 15/66] remove old progress - progress.Progress - progress.ProgressCallback - progress.progress --- dvc/progress.py | 71 ------------------------------------------------- 1 file changed, 71 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index f2eedda121..042c8c1bea 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -63,74 +63,3 @@ def truncate(cls, s, max_len=10, end=True, fill="..."): return fill[-max_len:] if end else fill[:max_len] i = max_len - len(fill) return (fill + s[-i:]) if end else (s[:i] + fill) - - -class Progress(tqdm): - """ - Simple multi-target progress bar. - TODO: remove this class. - """ - def __init__(self): - super(Progress, self).__init__(total=0, disable=True) - from time import time - self._time = time - self.set_lock(Lock()) - self._targets = {} - self.clearln() - - def set_n_total(self, total): - """Sets total number of targets.""" - self.reset(total) - - @property - def is_finished(self): - """Returns if all targets have finished.""" - return self.total == self.n - - def clearln(self): - pass - - def update_target(self, name, current, total, auto_finish=False): - """Updates progress bar for a specified target.""" - if total and self.total != total: - self.set_n_total(total) - self.set_postfix_str(name, refresh=False) - self.update(current - self.n) - if auto_finish and self.is_finished: - self.finish_target(name) - - def finish_target(self, name): - """Finishes progress bar for a specified target.""" - # TODO: We have to write a msg about finished target - self.set_postfix_str(name, refresh=False) - self.clearln() - - def __call__(self, seq, name="", total=None): - logger = logging.getLogger(__name__) - logger.warning("DeprecationWarning: create Tqdm() instance instead") - if total is None: - total = len(seq) - - self.update_target(name, 0, total) - for done, item in enumerate(seq, start=1): - yield item - self.update_target(name, done, total) - self.finish_target(name) - - -class ProgressCallback(object): - # TODO: remove this thin wrapper - def __init__(self, total): - self.total = total - self.current = 0 - progress.reset() - - def update(self, name, progress_to_add=1): - self.current += progress_to_add - progress.update_target(name, self.current, self.total) - - def finish(self, name): - progress.finish_target(name) - - -progress = Progress() # pylint: disable=invalid-name From c02807deb3fb691ea951a6a78d1d4db62d522742 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 16:01:18 +0100 Subject: [PATCH 16/66] tqdm remote.oss --- dvc/remote/oss.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dvc/remote/oss.py b/dvc/remote/oss.py index 79912edc36..6e493cc074 100644 --- a/dvc/remote/oss.py +++ b/dvc/remote/oss.py @@ -13,7 +13,7 @@ from dvc.config import Config from dvc.remote.base import RemoteBASE -from dvc.remote.azure import Callback +from dvc.progress import Tqdm from dvc.path_info import CloudURLInfo @@ -107,15 +107,15 @@ def list_cache_paths(self): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - cb = None if no_progress_bar else Callback(name) - self.oss_service.put_object_from_file( - to_info.path, from_file, progress_callback=cb - ) + with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: + self.oss_service.put_object_from_file( + to_info.path, from_file, progress_callback=pbar.update_to + ) def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): - cb = None if no_progress_bar else Callback(name) - self.oss_service.get_object_to_file( - from_info.path, to_file, progress_callback=cb - ) + with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: + self.oss_service.get_object_to_file( + from_info.path, to_file, progress_callback=pbar.update_to + ) From ca68eb4a4b874da50897a94ee4df9b12b7dae3b8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 16:04:23 +0100 Subject: [PATCH 17/66] add progress.Tqdm.update_desc(truncate=True) --- dvc/progress.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 042c8c1bea..0f5c567941 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -38,11 +38,13 @@ def __init__( **kwargs) # self.set_lock(Lock()) - def update_desc(self, desc, n=1): + def update_desc(self, desc, n=1, truncate=True): """ - Calls `set_description(desc)` and `update(n)` + Calls `set_description(truncate(desc))` and `update(n)` """ - self.set_description(desc, refresh=False) + self.set_description( + self.truncate(desc) if truncate else desc, refresh=False + ) self.update(n) def update_to(self, current, total=None): From aed3451df924a891f94d052e56f2f693e6237015 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 16:38:29 +0100 Subject: [PATCH 18/66] pylint --- dvc/progress.py | 7 ++----- dvc/remote/http.py | 4 +++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 0f5c567941..7f0ab98a37 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -2,7 +2,6 @@ from __future__ import print_function import logging from tqdm import tqdm -from threading import Lock class Tqdm(tqdm): @@ -14,8 +13,7 @@ def __init__( iterable=None, disable=logging.getLogger( __name__).getEffectiveLevel() >= logging.CRITICAL, - ascii=None, # TODO: True? - bytes=False, + bytes=False, # pylint: disable=W0622 desc_truncate=None, **kwargs): """ @@ -34,7 +32,6 @@ def __init__( super(Tqdm, self).__init__( iterable=iterable, disable=disable, - ascii=ascii, **kwargs) # self.set_lock(Lock()) @@ -49,7 +46,7 @@ def update_desc(self, desc, n=1, truncate=True): def update_to(self, current, total=None): if total: - self.total = total + self.total = total # pylint: disable=W0613 self.update(current - self.n) @classmethod diff --git a/dvc/remote/http.py b/dvc/remote/http.py index fe778cf7be..64648bdd14 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -29,13 +29,15 @@ def __init__(self, repo, config): def _download(self, from_info, to_file, name=None, no_progress_bar=False): request = self._request("GET", from_info.url, stream=True) total = self._content_length(from_info.url) + if name is None: + name = from_info.url leave = False # TODO: persistent progress only for "large" files? # if total: # leave = total > self.CHUNK_SIZE * 100 with Tqdm(total=total, leave=leave, bytes=True, - desc_truncate=to_file, + desc_truncate=name, disable=no_progress_bar) as pbar: with open(to_file, "wb") as fd: for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): From 9143f67ee049a1143a4e8e2da2c2e52652da193c Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 28 Jul 2019 16:47:25 +0100 Subject: [PATCH 19/66] black --- dvc/progress.py | 30 ++++++++++++++++-------------- dvc/remote/azure.py | 12 ++++++++---- dvc/remote/http.py | 10 +++++++--- dvc/remote/local/__init__.py | 2 +- dvc/remote/s3.py | 16 ++++++++++++---- dvc/remote/ssh/connection.py | 6 ++---- dvc/utils/__init__.py | 11 ++++++++--- 7 files changed, 54 insertions(+), 33 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 7f0ab98a37..17085a48a9 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -8,14 +8,16 @@ class Tqdm(tqdm): """ maximum-compatibility tqdm-based progressbars """ + def __init__( - self, - iterable=None, - disable=logging.getLogger( - __name__).getEffectiveLevel() >= logging.CRITICAL, - bytes=False, # pylint: disable=W0622 - desc_truncate=None, - **kwargs): + self, + iterable=None, + disable=logging.getLogger(__name__).getEffectiveLevel() + >= logging.CRITICAL, + bytes=False, # pylint: disable=W0622 + desc_truncate=None, + **kwargs + ): """ bytes : shortcut for `unit='B', unit_scale=True, unit_divisor=1024, miniters=1` @@ -24,15 +26,15 @@ def __init__( """ # kwargs = deepcopy(kwargs) if bytes: - for k, v in dict(unit='B', unit_scale=True, unit_divisor=1024, - miniters=1): + for k, v in dict( + unit="B", unit_scale=True, unit_divisor=1024, miniters=1 + ): kwargs.setdefault(k, v) if desc_truncate is not None: - kwargs.setdefault('desc', self.truncate(desc_truncate)) + kwargs.setdefault("desc", self.truncate(desc_truncate)) super(Tqdm, self).__init__( - iterable=iterable, - disable=disable, - **kwargs) + iterable=iterable, disable=disable, **kwargs + ) # self.set_lock(Lock()) def update_desc(self, desc, n=1, truncate=True): @@ -46,7 +48,7 @@ def update_desc(self, desc, n=1, truncate=True): def update_to(self, current, total=None): if total: - self.total = total # pylint: disable=W0613 + self.total = total # pylint: disable=W0613,W0201 self.update(current - self.n) @classmethod diff --git a/dvc/remote/azure.py b/dvc/remote/azure.py index 5af5c31bc5..5f8839d1ec 100644 --- a/dvc/remote/azure.py +++ b/dvc/remote/azure.py @@ -118,8 +118,10 @@ def _upload( ): with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: self.blob_service.create_blob_from_path( - to_info.bucket, to_info.path, from_file, - progress_callback=pbar.update_to + to_info.bucket, + to_info.path, + from_file, + progress_callback=pbar.update_to, ) def _download( @@ -127,8 +129,10 @@ def _download( ): with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: self.blob_service.get_blob_to_path( - from_info.bucket, from_info.path, to_file, - progress_callback=pbar.update_to + from_info.bucket, + from_info.path, + to_file, + progress_callback=pbar.update_to, ) def open(self, path_info, mode="r", encoding=None): diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 64648bdd14..68fa9b4fd3 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -36,9 +36,13 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): # if total: # leave = total > self.CHUNK_SIZE * 100 - with Tqdm(total=total, leave=leave, bytes=True, - desc_truncate=name, - disable=no_progress_bar) as pbar: + with Tqdm( + total=total, + leave=leave, + bytes=True, + desc_truncate=name, + disable=no_progress_bar, + ) as pbar: with open(to_file, "wb") as fd: for chunk in request.iter_content(chunk_size=self.CHUNK_SIZE): fd.write(chunk) diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 9de7c445ba..22ec5377d8 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -255,7 +255,7 @@ def move(self, from_info, to_info): def cache_exists(self, checksums, jobs=None): return [ checksum - for checksum in Tqdm(checksums, unit='md5') + for checksum in Tqdm(checksums, unit="md5") if not self.changed_cache_file(checksum) ] diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 382aba9ae5..72eaf41d1e 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -214,8 +214,12 @@ def batch_exists(self, path_infos, callback): def _upload(self, from_file, to_info, name=None, no_progress_bar=False): total = os.path.getsize(from_file) - with Tqdm(disable=no_progress_bar, total=total, bytes=True, - desc_truncate=name) as pbar: + with Tqdm( + disable=no_progress_bar, + total=total, + bytes=True, + desc_truncate=name, + ) as pbar: self.s3.upload_file( from_file, to_info.bucket, @@ -228,8 +232,12 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): total = self.s3.head_object( Bucket=from_info.bucket, Key=from_info.path )["ContentLength"] - with Tqdm(disable=no_progress_bar, total=total, bytes=True, - desc_truncate=name) as pbar: + with Tqdm( + disable=no_progress_bar, + total=total, + bytes=True, + desc_truncate=name, + ) as pbar: self.s3.download_file( from_info.bucket, from_info.path, to_file, Callback=pbar.update ) diff --git a/dvc/remote/ssh/connection.py b/dvc/remote/ssh/connection.py index 1a0e9fb2e7..d30e84194e 100644 --- a/dvc/remote/ssh/connection.py +++ b/dvc/remote/ssh/connection.py @@ -172,8 +172,7 @@ def download(self, src, dest, no_progress_bar=False, progress_title=None): progress_title = os.path.basename(src) with Tqdm( - desc_truncate=progress_title, - disable=no_progress_bar + desc_truncate=progress_title, disable=no_progress_bar ) as pbar: self.sftp.get(src, dest, callback=pbar.update_to) @@ -188,8 +187,7 @@ def upload(self, src, dest, no_progress_bar=False, progress_title=None): progress_title = posixpath.basename(dest) with Tqdm( - desc_truncate=progress_title, - disable=no_progress_bar + desc_truncate=progress_title, disable=no_progress_bar ) as pbar: self.sftp.put(src, tmp_file, callback=pbar.update_to) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 3a247be82f..8254ee11da 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -59,8 +59,11 @@ def file_md5(fname): name = relpath(fname) with Tqdm( - desc_truncate=name, disable=not bar, total=size, bytes=True, - leave=False + desc_truncate=name, + disable=not bar, + total=size, + bytes=True, + leave=False, ) as pbar: with open(fname, "rb") as fobj: while True: @@ -129,7 +132,9 @@ def copyfile(src, dest, no_progress_bar=False, name=None): System.reflink(src, dest) except DvcException: with Tqdm( - desc_truncate=name, disable=no_progress_bar, total=total, + desc_truncate=name, + disable=no_progress_bar, + total=total, bytes=True, ) as pbar: with open(src, "rb") as fsrc, open(dest, "wb+") as fdest: From f88bf9218a4b3ae0dda08a4d55ede9785b4c270f Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 10 Aug 2019 14:41:08 +0100 Subject: [PATCH 20/66] tqdm auto-disabling with dynamic log level --- dvc/progress.py | 8 ++++++-- dvc/remote/local/__init__.py | 4 +--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 17085a48a9..8069ca0bc4 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -12,8 +12,7 @@ class Tqdm(tqdm): def __init__( self, iterable=None, - disable=logging.getLogger(__name__).getEffectiveLevel() - >= logging.CRITICAL, + disable=None, bytes=False, # pylint: disable=W0622 desc_truncate=None, **kwargs @@ -32,6 +31,11 @@ def __init__( kwargs.setdefault(k, v) if desc_truncate is not None: kwargs.setdefault("desc", self.truncate(desc_truncate)) + if disable is None: + disable = ( + logging.getLogger(__name__).getEffectiveLevel() + >= logging.CRITICAL + ) super(Tqdm, self).__init__( iterable=iterable, disable=disable, **kwargs ) diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 22ec5377d8..7c50a64357 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -348,9 +348,7 @@ def _get_plans(self, download, remote, status_info, status): cache = [] path_infos = [] names = [] - for md5, info in Tqdm( - status_info.items(), desc="Analysing status" - ): + for md5, info in Tqdm(status_info.items(), desc="Analysing status"): if info["status"] == status: cache.append(self.checksum_to_path_info(md5)) path_infos.append(remote.checksum_to_path_info(md5)) From 56db2ee5fe5cccae72fdc26d1a2835df8d5f80f2 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 10 Aug 2019 14:54:51 +0100 Subject: [PATCH 21/66] misc minor restructuring --- dvc/remote/base.py | 30 +++++++++++++++++------------- dvc/remote/s3.py | 9 ++++++--- dvc/repo/checkout.py | 22 +++++++++++----------- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 224744d522..5b527806ed 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -11,6 +11,7 @@ from operator import itemgetter from multiprocessing import cpu_count from concurrent.futures import ThreadPoolExecutor +import functools import dvc.prompt as prompt from dvc.config import Config @@ -628,20 +629,23 @@ def cache_exists(self, checksums, jobs=None): Returns: A list with checksums that were found in the remote """ - pbar = Tqdm(total=len(checksums)) - - def exists_with_progress(chunks): - return self.batch_exists(chunks, callback=pbar.update_desc) - if self.no_traverse and hasattr(self, "batch_exists"): - with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: - path_infos = [self.checksum_to_path_info(x) for x in checksums] - chunks = to_chunks(path_infos, num_chunks=self.JOBS) - results = executor.map(exists_with_progress, chunks) - in_remote = itertools.chain.from_iterable(results) - ret = list(itertools.compress(checksums, in_remote)) - pbar.close() - return ret + with Tqdm(total=len(checksums)) as pbar: + exists_with_progress = functools.partial( + self.batch_exists, callback=pbar.update_desc + ) + + with ThreadPoolExecutor( + max_workers=jobs or self.JOBS + ) as executor: + path_infos = [ + self.checksum_to_path_info(x) for x in checksums + ] + chunks = to_chunks(path_infos, num_chunks=self.JOBS) + results = executor.map(exists_with_progress, chunks) + in_remote = itertools.chain.from_iterable(results) + ret = list(itertools.compress(checksums, in_remote)) + return ret return list(set(checksums) & set(self.all())) diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 72eaf41d1e..40f47c104e 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -229,9 +229,12 @@ def _upload(self, from_file, to_info, name=None, no_progress_bar=False): ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): - total = self.s3.head_object( - Bucket=from_info.bucket, Key=from_info.path - )["ContentLength"] + if no_progress_bar: + total = None + else: + total = self.s3.head_object( + Bucket=from_info.bucket, Key=from_info.path + )["ContentLength"] with Tqdm( disable=no_progress_bar, total=total, diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 826a24a818..0ddbfc4d7d 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -37,14 +37,14 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): with self.state: _cleanup_unused_links(self, all_stages) - pbar = Tqdm(total=get_all_files_numbers(stages)) - - for stage in stages: - if stage.locked: - logger.warning( - "DVC-file '{path}' is locked. Its dependencies are" - " not going to be checked out.".format(path=stage.relpath) - ) - - stage.checkout(force=force, progress_callback=pbar.update_desc) - pbar.close() + with Tqdm(total=get_all_files_numbers(stages)) as pbar: + for stage in stages: + if stage.locked: + logger.warning( + "DVC-file '{path}' is locked. Its dependencies are" + " not going to be checked out.".format( + path=stage.relpath + ) + ) + + stage.checkout(force=force, progress_callback=pbar.update_desc) From 2ac99d87e56fb1d6175d54b33f8164372ebf4f60 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 10 Aug 2019 17:08:14 +0100 Subject: [PATCH 22/66] update dvc.progress test stub --- tests/unit/test_progress.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/tests/unit/test_progress.py b/tests/unit/test_progress.py index 802bcb5fbe..6e1a0cf815 100644 --- a/tests/unit/test_progress.py +++ b/tests/unit/test_progress.py @@ -1,19 +1,17 @@ import logging -import mock -from dvc.progress import progress, ProgressCallback +from dvc.progress import Tqdm def test_quiet(caplog, capsys): with caplog.at_level(logging.CRITICAL, logger="dvc"): - progress.clearln() - assert capsys.readouterr().out == "" - - -class TestProgressCallback: - @mock.patch("dvc.progress.progress") - def test_should_init_reset_progress(self, progress_mock): - total_files_num = 1 - - ProgressCallback(total_files_num) - - assert [mock.call.reset()] == progress_mock.method_calls + for _ in Tqdm(range(10)): + pass + out_err = capsys.readouterr() + assert out_err.out == "" + assert out_err.err == "" + with caplog.at_level(logging.INFO, logger="dvc"): + for _ in Tqdm(range(10)): + pass + out_err = capsys.readouterr() + assert out_err.out == "" + assert '0/10' in out_err.err From 1925d6338bb3de96c8437af912fcd4fd67f8f4f1 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 10 Aug 2019 17:13:58 +0100 Subject: [PATCH 23/66] fix silly error --- dvc/progress.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/progress.py b/dvc/progress.py index 8069ca0bc4..84ea9f4ed0 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -27,7 +27,7 @@ def __init__( if bytes: for k, v in dict( unit="B", unit_scale=True, unit_divisor=1024, miniters=1 - ): + ).items(): kwargs.setdefault(k, v) if desc_truncate is not None: kwargs.setdefault("desc", self.truncate(desc_truncate)) From 2f42db0eaa844d9353f34d87d2009d2b1534c3c2 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 00:01:40 +0100 Subject: [PATCH 24/66] fix base progress_callback --- dvc/remote/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 5b527806ed..c2b6b0d503 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -685,7 +685,7 @@ def _checkout_file( self.state.save_link(path_info) self.state.save(path_info, checksum) if progress_callback: - progress_callback.update(str(path_info)) + progress_callback(str(path_info)) def makedirs(self, path_info): raise NotImplementedError @@ -715,7 +715,7 @@ def _checkout_dir( self.link(entry_cache_info, entry_info) self.state.save(entry_info, entry_checksum) if progress_callback: - progress_callback.update(str(entry_info)) + progress_callback(str(entry_info)) self._remove_redundant_files(path_info, dir_info, force) From d0315a35097e53587bf7d4eec8e579f2943f1a3e Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 00:09:18 +0100 Subject: [PATCH 25/66] minor black, test fixes --- tests/func/test_remote.py | 7 ++++--- tests/unit/test_progress.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index dadae44ff3..a85aa4ae5f 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -162,15 +162,16 @@ def test(self): def test_large_dir_progress(repo_dir, dvc_repo): from dvc.utils import LARGE_DIR_SIZE - from dvc.progress import progress + from dvc.progress import Tqdm # Create a "large dir" for i in range(LARGE_DIR_SIZE + 1): repo_dir.create(os.path.join("gen", "{}.txt".format(i)), str(i)) - with patch.object(progress, "update_target") as update_target: + with patch.object(Tqdm, "truncate") as truncate: + assert not truncate.called dvc_repo.add("gen") - assert update_target.called + assert truncate.called def test_dir_checksum_should_be_key_order_agnostic(dvc_repo): diff --git a/tests/unit/test_progress.py b/tests/unit/test_progress.py index 6e1a0cf815..8149300c03 100644 --- a/tests/unit/test_progress.py +++ b/tests/unit/test_progress.py @@ -14,4 +14,4 @@ def test_quiet(caplog, capsys): pass out_err = capsys.readouterr() assert out_err.out == "" - assert '0/10' in out_err.err + assert "0/10" in out_err.err From ee4248df92a81d0442057f69048fdf5548549f92 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 01:17:42 +0100 Subject: [PATCH 26/66] fix and update more tests --- tests/func/test_checkout.py | 55 +++++++++++++++++++++---------------- tests/unit/test_logger.py | 2 +- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index 1aa7a83702..f4b0b6e33b 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -1,14 +1,13 @@ import os import sys import re - import shutil import filecmp import collections import logging from dvc.main import main -from dvc import progress +from dvc.progress import Tqdm from dvc.repo import Repo as DvcRepo from dvc.system import System from dvc.utils import walk_files, relpath @@ -410,7 +409,7 @@ def setUp(self): def test(self): with self._caplog.at_level(logging.INFO, logger="dvc"), patch.object( - sys, "stdout" + sys, "stderr" ) as stdout_mock: self.stdout_mock = logger.handlers[0].stream = stdout_mock @@ -422,31 +421,29 @@ def test(self): write_calls = self.filter_out_empty_write_calls(write_calls) self.write_args = [w_c[1][0] for w_c in write_calls] - pattern = re.compile(".*\\[.{30}\\].*%.*") - progress_bars = [ - arg - for arg in self.write_args - if pattern.match(arg) and "unpacked" not in arg - ] - + pattern = re.compile(r"%\|\W+\| .*\[.*\]") + progress_bars = filter(pattern.search, self.write_args) + progress_bars = [arg for arg in progress_bars if "unpacked" not in arg] update_bars = progress_bars[:-1] finish_bar = progress_bars[-1] - self.assertEqual(4, len(update_bars)) - assert re.search(".*\\[#{7} {23}\\] 25%.*", progress_bars[0]) - assert re.search(".*\\[#{15} {15}\\] 50%.*", progress_bars[1]) - assert re.search(".*\\[#{22} {8}\\] 75%.*", progress_bars[2]) - assert re.search(".*\\[#{30}\\] 100%.*", progress_bars[3]) + # at least the inital (blank) update_bar should be printed; + # but maybe no intermediate ones + self.assertLessEqual(1, len(update_bars)) + self.assertLessEqual(len(update_bars), 4) self.assertCaretReturnFollowsEach(update_bars) self.assertNewLineFollows(finish_bar) - self.assertAnyEndsWith(update_bars, self.FOO) - self.assertAnyEndsWith(update_bars, self.BAR) - self.assertAnyEndsWith(update_bars, self.DATA) - self.assertAnyEndsWith(update_bars, self.DATA_SUB) + # self.assertAnyEndsWith(update_bars, self.FOO) + # self.assertAnyEndsWith(update_bars, self.BAR) + # self.assertAnyEndsWith(update_bars, self.DATA) + # self.assertAnyEndsWith(update_bars, self.DATA_SUB) + self.assertAnyStartsWith( + [finish_bar], "\r" + Tqdm.truncate(self.DATA_SUB) + ) - self.assertTrue(finish_bar.endswith("Checkout finished!")) + # self.assertTrue(finish_bar.endswith("Checkout finished!")) def filter_out_empty_write_calls(self, calls): def is_not_empty_write(call): @@ -485,14 +482,22 @@ def _prepare_repo(self): def assertCaretReturnFollowsEach(self, update_bars): for update_bar in update_bars: - self.assertIn(update_bar, self.write_args) for index, arg in enumerate(self.write_args): if arg == update_bar: - self.assertEqual( - progress.CLEARLINE_PATTERN, self.write_args[index + 1] - ) + lines = 0 + for arg in self.write_args[index + 1:]: + if arg == "\n": + lines += 1 + elif arg == "\x1b[A": + lines -= 1 + elif arg.startswith('\r') and 'unpacked' in arg: + pass + else: + self.assertEqual(0, lines) + self.assertEqual("\r", arg[0]) + break def assertNewLineFollows(self, finish_bar): self.assertIn(finish_bar, self.write_args) @@ -504,6 +509,8 @@ def assertNewLineFollows(self, finish_bar): def assertAnyEndsWith(self, update_bars, name): self.assertTrue(any(ub for ub in update_bars if ub.endswith(name))) + def assertAnyStartsWith(self, update_bars, name): + self.assertTrue(any(ub for ub in update_bars if ub.startswith(name))) class TestCheckoutTargetRecursiveShouldNotRemoveOtherUsedFiles(TestDvc): def test(self): diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 12272c92f0..61da0f22ce 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -151,7 +151,7 @@ def test_nested_exceptions(self, caplog): assert expected == formatter.format(caplog.records[0]) def test_progress_awareness(self, mocker, capsys, caplog): - from dvc.progress import progress + from dvc.progress import Tqdm with mocker.patch("sys.stdout.isatty", return_value=True): progress.set_n_total(100) From 120dfff05a573ce42f1b542608daecfb961ef840 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 01:18:13 +0100 Subject: [PATCH 27/66] naive test update --- tests/func/test_checkout.py | 5 ++-- tests/unit/test_logger.py | 50 ++++++++++++++++++------------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index f4b0b6e33b..c453ffb5da 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -487,12 +487,12 @@ def assertCaretReturnFollowsEach(self, update_bars): for index, arg in enumerate(self.write_args): if arg == update_bar: lines = 0 - for arg in self.write_args[index + 1:]: + for arg in self.write_args[index + 1 :]: if arg == "\n": lines += 1 elif arg == "\x1b[A": lines -= 1 - elif arg.startswith('\r') and 'unpacked' in arg: + elif arg.startswith("\r") and "unpacked" in arg: pass else: self.assertEqual(0, lines) @@ -512,6 +512,7 @@ def assertAnyEndsWith(self, update_bars, name): def assertAnyStartsWith(self, update_bars, name): self.assertTrue(any(ub for ub in update_bars if ub.startswith(name))) + class TestCheckoutTargetRecursiveShouldNotRemoveOtherUsedFiles(TestDvc): def test(self): ret = main(["add", self.DATA_DIR, self.FOO, self.BAR]) diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 61da0f22ce..1d3ad2eddf 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -154,28 +154,28 @@ def test_progress_awareness(self, mocker, capsys, caplog): from dvc.progress import Tqdm with mocker.patch("sys.stdout.isatty", return_value=True): - progress.set_n_total(100) - progress.update_target("progress", 1, 10) - - # logging an invisible message should not break - # the progress bar output - with caplog.at_level(logging.INFO, logger="dvc"): - debug_record = logging.LogRecord( - name="dvc", - level=logging.DEBUG, - pathname=__name__, - lineno=1, - msg="debug", - args=(), - exc_info=None, - ) - - formatter.format(debug_record) - captured = capsys.readouterr() - assert "\n" not in captured.out - - # just when the message is actually visible - with caplog.at_level(logging.INFO, logger="dvc"): - logger.info("some info") - captured = capsys.readouterr() - assert "\n" in captured.out + with Tqdm(total=100, desc="progress") as pbar: + pbar.update() + + # logging an invisible message should not break + # the progress bar output + with caplog.at_level(logging.INFO, logger="dvc"): + debug_record = logging.LogRecord( + name="dvc", + level=logging.DEBUG, + pathname=__name__, + lineno=1, + msg="debug", + args=(), + exc_info=None, + ) + + formatter.format(debug_record) + captured = capsys.readouterr() + assert captured.out == "" + + # when the message is actually visible + with caplog.at_level(logging.INFO, logger="dvc"): + logger.info("some info") + captured = capsys.readouterr() + captured.out == "" From ea8f4ba231aafb39fe7cfcbfd9b41dbf8c8d77d8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 01:26:01 +0100 Subject: [PATCH 28/66] slight test update --- tests/unit/test_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py index 1d3ad2eddf..57c876b3d3 100644 --- a/tests/unit/test_logger.py +++ b/tests/unit/test_logger.py @@ -178,4 +178,4 @@ def test_progress_awareness(self, mocker, capsys, caplog): with caplog.at_level(logging.INFO, logger="dvc"): logger.info("some info") captured = capsys.readouterr() - captured.out == "" + assert captured.out == "" From 7665782d93165c025ec05640754687253140f4cb Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 01:40:22 +0100 Subject: [PATCH 29/66] fix more callback usage --- dvc/remote/gs.py | 2 +- dvc/remote/s3.py | 2 +- dvc/remote/ssh/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index b171b9c4ee..3bb495fd85 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -94,7 +94,7 @@ def batch_exists(self, path_infos, callback): for path_info in path_infos: paths.append(self._list_paths(path_info.bucket, path_info.path)) - callback.update(str(path_info)) + callback(str(path_info)) paths = set(itertools.chain.from_iterable(paths)) diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 40f47c104e..f86d0277c6 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -207,7 +207,7 @@ def batch_exists(self, path_infos, callback): for path_info in path_infos: paths.append(self._list_paths(path_info.bucket, path_info.path)) - callback.update(str(path_info)) + callback(str(path_info)) paths = set(itertools.chain.from_iterable(paths)) return [path_info.path in paths for path_info in path_infos] diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index 22a7da5aad..993ad563fb 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -160,7 +160,7 @@ def _exists(chunk_and_channel): if exc.errno != errno.ENOENT: raise ret.append(False) - callback.update(path) + callback(path) return ret with self.ssh(path_infos[0]) as ssh: From 7328ff48669d861891cb769aeb871b873e3148dc Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 01:41:01 +0100 Subject: [PATCH 30/66] update tqdm requirement --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0e74366afa..7680ac8d63 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ def run(self): "funcy>=1.12", "pathspec>=0.5.9", "shortuuid>=0.5.0", - "tqdm>=4.32.1", + "tqdm>=4.32.2", "win-unicode-console>=0.5; sys_platform == 'win32'", ] From 0de87af5fd77b2e317e8da1360e40982bf70f0f6 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 23:11:06 +0100 Subject: [PATCH 31/66] progress-aware logging --- dvc/logger.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dvc/logger.py b/dvc/logger.py index 59ac3e3f37..93644c4033 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals from dvc.utils.compat import str, StringIO +from dvc.progress import Tqdm import logging import logging.handlers @@ -149,6 +150,17 @@ def handleError(self, record): super(LoggerHandler, self).handleError(record) raise LoggingException(record) + def emit(self, record): + """Write to Tqdm's stream so as to not break progressbars""" + try: + msg = self.format(record) + Tqdm.write(msg) + self.flush() + except (KeyboardInterrupt, SystemExit): + raise + except: # noqa pylint: disable=bare-except + self.handleError(record) + def setup(level=logging.INFO): colorama.init() From 56d1cca6bcc7fd4c5f3c722948da1f64272f2767 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 23:23:16 +0100 Subject: [PATCH 32/66] log to correct stream --- dvc/logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/logger.py b/dvc/logger.py index 93644c4033..28fbff9110 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -154,7 +154,7 @@ def emit(self, record): """Write to Tqdm's stream so as to not break progressbars""" try: msg = self.format(record) - Tqdm.write(msg) + Tqdm.write(msg, file=self.stream) self.flush() except (KeyboardInterrupt, SystemExit): raise From 032d507ede4b51f214de570c1e2bb04db509f3a1 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 11 Aug 2019 23:34:57 +0100 Subject: [PATCH 33/66] persistent progress for large http files --- dvc/remote/http.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 68fa9b4fd3..5942a6c0dc 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals from dvc.scheme import Schemes +from dvc.utils import LARGE_FILE_SIZE from dvc.utils.compat import open import requests @@ -31,14 +32,10 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): total = self._content_length(from_info.url) if name is None: name = from_info.url - leave = False - # TODO: persistent progress only for "large" files? - # if total: - # leave = total > self.CHUNK_SIZE * 100 with Tqdm( total=total, - leave=leave, + leave=False, bytes=True, desc_truncate=name, disable=no_progress_bar, @@ -48,6 +45,9 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): fd.write(chunk) fd.flush() pbar.update(len(chunk)) + # print completed progress bar for large file sizes + if pbar.n > LARGE_FILE_SIZE: + Tqdm.write(str(pbar)) def exists(self, path_info): return bool(self._request("HEAD", path_info.url)) From 1d617459e48270f3359ff7b151bd7a27855f903f Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 12 Aug 2019 00:05:31 +0100 Subject: [PATCH 34/66] ensure nested bars are cleared away --- dvc/progress.py | 19 +++++++++++++++++++ dvc/remote/base.py | 9 +++++---- dvc/remote/local/__init__.py | 5 ++--- dvc/remote/ssh/__init__.py | 4 ++-- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 84ea9f4ed0..b39eaf890e 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -2,6 +2,25 @@ from __future__ import print_function import logging from tqdm import tqdm +from concurrent.futures import ThreadPoolExecutor + + +class TqdmThreadPoolExecutor(ThreadPoolExecutor): + """ + Ensure worker progressbars are cleared away properly. + """ + + def __enter__(self): + """ + Creates a blank initial dummy progress bar so that workers are forced to + create "nested" bars. + """ + self.blank_bar = Tqdm(bar_format="Multi-Threaded:", leave=False) + super(TqdmThreadPoolExecutor, self).__enter__(self) + + def __exit__(self, *a, **k): + super(TqdmThreadPoolExecutor, self).__exit__(*a, **k) + self.blank_bar.close() class Tqdm(tqdm): diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c2b6b0d503..9a2aeea35b 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -10,7 +10,6 @@ import itertools from operator import itemgetter from multiprocessing import cpu_count -from concurrent.futures import ThreadPoolExecutor import functools import dvc.prompt as prompt @@ -20,7 +19,7 @@ ConfirmRemoveError, DvcIgnoreInCollectedDirError, ) -from dvc.progress import Tqdm +from dvc.progress import Tqdm, TqdmThreadPoolExecutor from dvc.utils import ( LARGE_DIR_SIZE, tmp_fname, @@ -151,7 +150,9 @@ def get_file_checksum(self, path_info): def _calculate_checksums(self, file_infos): file_infos = list(file_infos) - with ThreadPoolExecutor(max_workers=self.checksum_jobs) as executor: + with TqdmThreadPoolExecutor( + max_workers=self.checksum_jobs + ) as executor: tasks = executor.map(self.get_file_checksum, file_infos) if len(file_infos) > LARGE_DIR_SIZE: @@ -635,7 +636,7 @@ def cache_exists(self, checksums, jobs=None): self.batch_exists, callback=pbar.update_desc ) - with ThreadPoolExecutor( + with TqdmThreadPoolExecutor( max_workers=jobs or self.JOBS ) as executor: path_infos = [ diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 7c50a64357..0572e5945e 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -35,8 +35,7 @@ ) from dvc.config import Config from dvc.exceptions import DvcException -from dvc.progress import Tqdm -from concurrent.futures import ThreadPoolExecutor +from dvc.progress import Tqdm, TqdmThreadPoolExecutor from dvc.path_info import PathInfo @@ -406,7 +405,7 @@ def _process( return 0 if jobs > 1: - with ThreadPoolExecutor(max_workers=jobs) as executor: + with TqdmThreadPoolExecutor(max_workers=jobs) as executor: fails = sum(executor.map(func, *plans)) else: fails = sum(map(func, *plans)) diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index 993ad563fb..c817eabaa3 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -5,7 +5,6 @@ import logging import itertools import errno -from concurrent.futures import ThreadPoolExecutor import threading try: @@ -20,6 +19,7 @@ from dvc.remote.base import RemoteBASE from dvc.scheme import Schemes from dvc.remote.pool import get_connection +from dvc.progress import TqdmThreadPoolExecutor from .connection import SSHConnection @@ -167,7 +167,7 @@ def _exists(chunk_and_channel): channels = ssh.open_max_sftp_channels() max_workers = len(channels) - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with TqdmThreadPoolExecutor(max_workers=max_workers) as executor: paths = [path_info.path for path_info in path_infos] chunks = to_chunks(paths, num_chunks=max_workers) chunks_and_channels = zip(chunks, channels) From f834e728e2635c91f95bd234c9dd2ffca7699ee0 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 12 Aug 2019 00:18:30 +0100 Subject: [PATCH 35/66] fix flake8 --- dvc/progress.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index b39eaf890e..fd7dff3e2c 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -12,8 +12,8 @@ class TqdmThreadPoolExecutor(ThreadPoolExecutor): def __enter__(self): """ - Creates a blank initial dummy progress bar so that workers are forced to - create "nested" bars. + Creates a blank initial dummy progress bar so that workers are forced + to create "nested" bars. """ self.blank_bar = Tqdm(bar_format="Multi-Threaded:", leave=False) super(TqdmThreadPoolExecutor, self).__enter__(self) From fbaadcb20fa951ff5706fce24613b3134a722130 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 12 Aug 2019 01:49:44 +0100 Subject: [PATCH 36/66] fix silly error --- dvc/progress.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/progress.py b/dvc/progress.py index fd7dff3e2c..d93a960668 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -16,7 +16,7 @@ def __enter__(self): to create "nested" bars. """ self.blank_bar = Tqdm(bar_format="Multi-Threaded:", leave=False) - super(TqdmThreadPoolExecutor, self).__enter__(self) + return super(TqdmThreadPoolExecutor, self).__enter__() def __exit__(self, *a, **k): super(TqdmThreadPoolExecutor, self).__exit__(*a, **k) From f373655cd5de8af9d8d882d055914940104b7019 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 13 Aug 2019 22:49:46 +0100 Subject: [PATCH 37/66] minor potential kwargs bugfix --- dvc/progress.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dvc/progress.py b/dvc/progress.py index d93a960668..bb7b02fa11 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -2,6 +2,7 @@ from __future__ import print_function import logging from tqdm import tqdm +from copy import deepcopy from concurrent.futures import ThreadPoolExecutor @@ -42,7 +43,7 @@ def __init__( desc_truncate : like `desc` but will truncate to 10 chars kwargs : anything accepted by `tqdm.tqdm()` """ - # kwargs = deepcopy(kwargs) + kwargs = deepcopy(kwargs) if bytes: for k, v in dict( unit="B", unit_scale=True, unit_divisor=1024, miniters=1 From 838151dc8f2188ee4edfccf0a7ce518257f9d8d6 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 13 Aug 2019 22:55:56 +0100 Subject: [PATCH 38/66] tidy & avoid excessive requests --- dvc/remote/http.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 5942a6c0dc..9c6aafea22 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -29,15 +29,13 @@ def __init__(self, repo, config): def _download(self, from_info, to_file, name=None, no_progress_bar=False): request = self._request("GET", from_info.url, stream=True) - total = self._content_length(from_info.url) - if name is None: - name = from_info.url + total = self._content_length(request) with Tqdm( total=total, leave=False, bytes=True, - desc_truncate=name, + desc_truncate=from_info.url if name is None else name, disable=no_progress_bar, ) as pbar: with open(to_file, "wb") as fd: @@ -46,7 +44,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): fd.flush() pbar.update(len(chunk)) # print completed progress bar for large file sizes - if pbar.n > LARGE_FILE_SIZE: + if (total or pbar.n) > LARGE_FILE_SIZE: Tqdm.write(str(pbar)) def exists(self, path_info): @@ -61,15 +59,19 @@ def batch_exists(self, path_infos, callback): return results - def _content_length(self, url): - res = self._request("HEAD", url).headers.get("Content-Length") + def _content_length(self, url_or_request): + headers = getattr( + url_or_request, + "headers", + self._request("HEAD", url_or_request).headers, + ) + res = headers.get("Content-Length") return int(res) if res else None def get_file_checksum(self, path_info): url = path_info.url - etag = self._request("HEAD", url).headers.get("ETag") or self._request( - "HEAD", url - ).headers.get("Content-MD5") + headers = self._request("HEAD", url).headers + etag = headers.get("ETag") or headers.get("Content-MD5") if not etag: raise DvcException( From 381114117711bbac83d8dd45a5f608147c3287db Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 13 Aug 2019 22:59:12 +0100 Subject: [PATCH 39/66] minor tidy --- dvc/remote/ssh/connection.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dvc/remote/ssh/connection.py b/dvc/remote/ssh/connection.py index d30e84194e..7243b694f9 100644 --- a/dvc/remote/ssh/connection.py +++ b/dvc/remote/ssh/connection.py @@ -168,11 +168,9 @@ def remove(self, path): self._remove_file(path) def download(self, src, dest, no_progress_bar=False, progress_title=None): - if not progress_title: - progress_title = os.path.basename(src) - with Tqdm( - desc_truncate=progress_title, disable=no_progress_bar + desc_truncate=progress_title or os.path.basename(src), + disable=no_progress_bar, ) as pbar: self.sftp.get(src, dest, callback=pbar.update_to) From a1f5c0368303444533f54590ff389cb8aaffa524 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 13 Aug 2019 23:38:30 +0100 Subject: [PATCH 40/66] tidy some ThreadPoolExecutor --- dvc/remote/base.py | 6 ++++-- dvc/remote/ssh/__init__.py | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 396c06696c..e355f63c6f 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -11,6 +11,7 @@ from operator import itemgetter from multiprocessing import cpu_count from functools import partial +from concurrent.futures import ThreadPoolExecutor import dvc.prompt as prompt from dvc.config import Config @@ -155,8 +156,8 @@ def _calculate_checksums(self, file_infos): "This is only done once." ) logger.info(msg) - tasks = Tqdm(tasks, total=len(file_infos), unit="md5") + tasks = Tqdm(tasks, total=len(file_infos), unit="md5") checksums = { file_infos[index]: task for index, task in enumerate(tasks) } @@ -632,12 +633,13 @@ def cache_exists(self, checksums, jobs=None): return list(set(checksums) & set(self.all())) with Tqdm(total=len(checksums)) as pbar: + def exists_with_progress(path_info): ret = self.exists(path_info) pbar.update_desc(str(path_info)) return ret - with TqdmThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: + with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: path_infos = [self.checksum_to_path_info(x) for x in checksums] in_remote = executor.map(exists_with_progress, path_infos) ret = list(itertools.compress(checksums, in_remote)) diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index fe76eb6bf2..e0b8f45e4a 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -252,6 +252,7 @@ def cache_exists(self, checksums, jobs=None): return list(set(checksums) & set(self.all())) with Tqdm(total=len(checksums)) as pbar: + def exists_with_progress(chunks): return self.batch_exists(chunks, callback=pbar.update_desc) From d9b4ecfe7b542a00fd8e890fdfac173f319cbdab Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 13 Aug 2019 23:46:20 +0100 Subject: [PATCH 41/66] better auto-closing TqdmThreadPoolExecutor --- dvc/progress.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index bb7b02fa11..def8fd4ec2 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -13,15 +13,19 @@ class TqdmThreadPoolExecutor(ThreadPoolExecutor): def __enter__(self): """ - Creates a blank initial dummy progress bar so that workers are forced - to create "nested" bars. + Creates a blank initial dummy progress bar if needed so that workers are + forced to create "nested" bars. """ - self.blank_bar = Tqdm(bar_format="Multi-Threaded:", leave=False) + blank_bar = Tqdm(bar_format="Multi-Threaded:", leave=False) + if blank_bar.pos > 0: + # already nested - don't need a placeholder bar + blank_bar.close() + self.bar = blank_bar return super(TqdmThreadPoolExecutor, self).__enter__() def __exit__(self, *a, **k): super(TqdmThreadPoolExecutor, self).__exit__(*a, **k) - self.blank_bar.close() + self.bar.close() class Tqdm(tqdm): From c6f73fdc65d7e3b89f541d92d91880858ba3b307 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 13 Aug 2019 23:47:44 +0100 Subject: [PATCH 42/66] flake8 --- dvc/progress.py | 4 ++-- dvc/remote/http.py | 1 - dvc/remote/s3.py | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index def8fd4ec2..2e8020fe75 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -13,8 +13,8 @@ class TqdmThreadPoolExecutor(ThreadPoolExecutor): def __enter__(self): """ - Creates a blank initial dummy progress bar if needed so that workers are - forced to create "nested" bars. + Creates a blank initial dummy progress bar if needed so that workers + are forced to create "nested" bars. """ blank_bar = Tqdm(bar_format="Multi-Threaded:", leave=False) if blank_bar.pos > 0: diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 64891bd16b..2a8f378b9f 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -4,7 +4,6 @@ from dvc.utils import LARGE_FILE_SIZE from dvc.utils.compat import open -import threading import requests import logging diff --git a/dvc/remote/s3.py b/dvc/remote/s3.py index 547a339693..c1bf272db7 100644 --- a/dvc/remote/s3.py +++ b/dvc/remote/s3.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals import os -import threading import logging from funcy import cached_property From f88c2fe5fcd17ff12d1dc18c3cd4e8d26eb198f7 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Wed, 14 Aug 2019 00:18:47 +0100 Subject: [PATCH 43/66] remove old test --- tests/func/test_checkout.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index c453ffb5da..bf86e917fe 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -7,7 +7,6 @@ import logging from dvc.main import main -from dvc.progress import Tqdm from dvc.repo import Repo as DvcRepo from dvc.system import System from dvc.utils import walk_files, relpath @@ -439,10 +438,9 @@ def test(self): # self.assertAnyEndsWith(update_bars, self.BAR) # self.assertAnyEndsWith(update_bars, self.DATA) # self.assertAnyEndsWith(update_bars, self.DATA_SUB) - self.assertAnyStartsWith( - [finish_bar], "\r" + Tqdm.truncate(self.DATA_SUB) - ) - + # self.assertAnyStartsWith( + # [finish_bar], "\r" + Tqdm.truncate(self.DATA_SUB) + # ) # self.assertTrue(finish_bar.endswith("Checkout finished!")) def filter_out_empty_write_calls(self, calls): From 186f0cc483c9d38a4aeabf998754cf87249170e5 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Wed, 14 Aug 2019 00:46:34 +0100 Subject: [PATCH 44/66] a little tidy --- dvc/remote/base.py | 18 +++++++++--------- dvc/remote/http.py | 3 ++- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index e355f63c6f..b73e24b893 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -151,16 +151,16 @@ def _calculate_checksums(self, file_infos): tasks = executor.map(self.get_file_checksum, file_infos) if len(file_infos) > LARGE_DIR_SIZE: - msg = ( - "Computing md5 for a large number of files. " - "This is only done once." + logger.info( + ( + "Computing md5 for a large number of files. " + "This is only done once." + ) ) - logger.info(msg) - - tasks = Tqdm(tasks, total=len(file_infos), unit="md5") - checksums = { - file_infos[index]: task for index, task in enumerate(tasks) - } + tasks = Tqdm(tasks, total=len(file_infos), unit="md5") + checksums = { + file_infos[index]: task for index, task in enumerate(tasks) + } return checksums def _collect_dir(self, path_info): diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 2a8f378b9f..4373fe788e 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -44,7 +44,8 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): fd.flush() pbar.update(len(chunk)) # print completed progress bar for large file sizes - if (total or pbar.n) > LARGE_FILE_SIZE: + pbar.n = total or pbar.n + if pbar.n > LARGE_FILE_SIZE: Tqdm.write(str(pbar)) def exists(self, path_info): From 39da3725f9b8e6842f06954c562873d0a8ff731a Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Wed, 14 Aug 2019 00:56:20 +0100 Subject: [PATCH 45/66] fix silly request header error --- dvc/remote/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 4373fe788e..1c0449cbd0 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -29,7 +29,7 @@ def __init__(self, repo, config): def _download(self, from_info, to_file, name=None, no_progress_bar=False): request = self._request("GET", from_info.url, stream=True) - total = self._content_length(request) + total = self._content_length(from_info) with Tqdm( total=total, From 8955a606c4acaf3a567daacb2cdf82118da6afbc Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Wed, 14 Aug 2019 01:11:17 +0100 Subject: [PATCH 46/66] add .mailmap --- .mailmap | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .mailmap diff --git a/.mailmap b/.mailmap new file mode 100644 index 0000000000..a8eae84140 --- /dev/null +++ b/.mailmap @@ -0,0 +1,6 @@ +Paweł Redzyński +Dmitry Petrov +Earl Hathaway +Nabanita Dash +Kurian Benoy +Sritanu Chakraborty From b018933b5abb8e11f5c42aa88c0e12f413a9da37 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Fri, 16 Aug 2019 22:26:14 +0100 Subject: [PATCH 47/66] fix py2 MagicMock(sys.stderr).encoding in tests --- tests/func/test_checkout.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index bf86e917fe..d08db4ac3a 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -407,11 +407,15 @@ def setUp(self): self._prepare_repo() def test(self): + encoding = sys.stderr.encoding with self._caplog.at_level(logging.INFO, logger="dvc"), patch.object( sys, "stderr" ) as stdout_mock: self.stdout_mock = logger.handlers[0].stream = stdout_mock + if sys.version_info[:1] < (3,): + sys.stderr.encoding = encoding + ret = main(["checkout"]) self.assertEqual(0, ret) From adf05e6ff0f6345ddd14cfc8f7b96fd449074ddc Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Fri, 16 Aug 2019 22:32:47 +0100 Subject: [PATCH 48/66] minor rename --- tests/func/test_checkout.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index d08db4ac3a..91bdeac1f5 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -410,8 +410,8 @@ def test(self): encoding = sys.stderr.encoding with self._caplog.at_level(logging.INFO, logger="dvc"), patch.object( sys, "stderr" - ) as stdout_mock: - self.stdout_mock = logger.handlers[0].stream = stdout_mock + ) as stderr_mock: + self.stderr_mock = logger.handlers[0].stream = stderr_mock if sys.version_info[:1] < (3,): sys.stderr.encoding = encoding @@ -419,7 +419,7 @@ def test(self): ret = main(["checkout"]) self.assertEqual(0, ret) - stdout_calls = self.stdout_mock.method_calls + stdout_calls = self.stderr_mock.method_calls write_calls = self.filter_out_non_write_calls(stdout_calls) write_calls = self.filter_out_empty_write_calls(write_calls) self.write_args = [w_c[1][0] for w_c in write_calls] From 60566b380e00ce0a5cfccbd0b8890e6c42bb3429 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Fri, 16 Aug 2019 22:32:58 +0100 Subject: [PATCH 49/66] minor test update --- tests/func/test_remote.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index a85aa4ae5f..b938f97f42 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -168,10 +168,10 @@ def test_large_dir_progress(repo_dir, dvc_repo): for i in range(LARGE_DIR_SIZE + 1): repo_dir.create(os.path.join("gen", "{}.txt".format(i)), str(i)) - with patch.object(Tqdm, "truncate") as truncate: - assert not truncate.called + with patch.object(Tqdm, "update") as update: + assert not update.called dvc_repo.add("gen") - assert truncate.called + assert update.called def test_dir_checksum_should_be_key_order_agnostic(dvc_repo): From 534d6fdb0cb4e1f36e57f206734272d9211ceed4 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 15:42:13 +0100 Subject: [PATCH 50/66] remove defunct test, extraneous comments - https://github.com/iterative/dvc/pull/2333#discussion_r314927966 --- dvc/progress.py | 1 - tests/func/test_checkout.py | 114 ------------------------------------ 2 files changed, 115 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index 2e8020fe75..e3fd74200f 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -63,7 +63,6 @@ def __init__( super(Tqdm, self).__init__( iterable=iterable, disable=disable, **kwargs ) - # self.set_lock(Lock()) def update_desc(self, desc, n=1, truncate=True): """ diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index 91bdeac1f5..0bda6a5f72 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -401,120 +401,6 @@ def test(self): self.assertIsNone(exc.cause.cause) -class TestCheckoutShouldHaveSelfClearingProgressBar(TestDvc): - def setUp(self): - super(TestCheckoutShouldHaveSelfClearingProgressBar, self).setUp() - self._prepare_repo() - - def test(self): - encoding = sys.stderr.encoding - with self._caplog.at_level(logging.INFO, logger="dvc"), patch.object( - sys, "stderr" - ) as stderr_mock: - self.stderr_mock = logger.handlers[0].stream = stderr_mock - - if sys.version_info[:1] < (3,): - sys.stderr.encoding = encoding - - ret = main(["checkout"]) - self.assertEqual(0, ret) - - stdout_calls = self.stderr_mock.method_calls - write_calls = self.filter_out_non_write_calls(stdout_calls) - write_calls = self.filter_out_empty_write_calls(write_calls) - self.write_args = [w_c[1][0] for w_c in write_calls] - - pattern = re.compile(r"%\|\W+\| .*\[.*\]") - progress_bars = filter(pattern.search, self.write_args) - progress_bars = [arg for arg in progress_bars if "unpacked" not in arg] - update_bars = progress_bars[:-1] - finish_bar = progress_bars[-1] - - # at least the inital (blank) update_bar should be printed; - # but maybe no intermediate ones - self.assertLessEqual(1, len(update_bars)) - self.assertLessEqual(len(update_bars), 4) - - self.assertCaretReturnFollowsEach(update_bars) - self.assertNewLineFollows(finish_bar) - - # self.assertAnyEndsWith(update_bars, self.FOO) - # self.assertAnyEndsWith(update_bars, self.BAR) - # self.assertAnyEndsWith(update_bars, self.DATA) - # self.assertAnyEndsWith(update_bars, self.DATA_SUB) - # self.assertAnyStartsWith( - # [finish_bar], "\r" + Tqdm.truncate(self.DATA_SUB) - # ) - # self.assertTrue(finish_bar.endswith("Checkout finished!")) - - def filter_out_empty_write_calls(self, calls): - def is_not_empty_write(call): - assert call[0] == "write" - return call[1][0] != "" - - return list(filter(is_not_empty_write, calls)) - - def filter_out_non_write_calls(self, calls): - def is_write_call(call): - return call[0] == "write" - - return list(filter(is_write_call, calls)) - - def _prepare_repo(self): - storage = self.mkdtemp() - - ret = main(["remote", "add", "-d", "myremote", storage]) - self.assertEqual(0, ret) - - ret = main(["add", self.DATA_DIR]) - self.assertEqual(0, ret) - - ret = main(["add", self.FOO]) - self.assertEqual(0, ret) - - ret = main(["add", self.BAR]) - self.assertEqual(0, ret) - - ret = main(["push"]) - self.assertEqual(0, ret) - - shutil.rmtree(self.DATA_DIR) - os.unlink(self.FOO) - os.unlink(self.BAR) - - def assertCaretReturnFollowsEach(self, update_bars): - for update_bar in update_bars: - self.assertIn(update_bar, self.write_args) - - for index, arg in enumerate(self.write_args): - if arg == update_bar: - lines = 0 - for arg in self.write_args[index + 1 :]: - if arg == "\n": - lines += 1 - elif arg == "\x1b[A": - lines -= 1 - elif arg.startswith("\r") and "unpacked" in arg: - pass - else: - self.assertEqual(0, lines) - self.assertEqual("\r", arg[0]) - break - - def assertNewLineFollows(self, finish_bar): - self.assertIn(finish_bar, self.write_args) - - for index, arg in enumerate(self.write_args): - if arg == finish_bar: - self.assertEqual("\n", self.write_args[index + 1]) - - def assertAnyEndsWith(self, update_bars, name): - self.assertTrue(any(ub for ub in update_bars if ub.endswith(name))) - - def assertAnyStartsWith(self, update_bars, name): - self.assertTrue(any(ub for ub in update_bars if ub.startswith(name))) - - class TestCheckoutTargetRecursiveShouldNotRemoveOtherUsedFiles(TestDvc): def test(self): ret = main(["add", self.DATA_DIR, self.FOO, self.BAR]) From ac4d03e66f2fc7c9c30abfbeae38275b227d1689 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 15:48:18 +0100 Subject: [PATCH 51/66] remove auto-persistent bars https://github.com/iterative/dvc/pull/2333/files#r314927686 --- dvc/remote/http.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 1c0449cbd0..5d24e99ade 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -29,10 +29,8 @@ def __init__(self, repo, config): def _download(self, from_info, to_file, name=None, no_progress_bar=False): request = self._request("GET", from_info.url, stream=True) - total = self._content_length(from_info) - with Tqdm( - total=total, + total=None if no_progress_bar else self._content_length(from_info), leave=False, bytes=True, desc_truncate=from_info.url if name is None else name, @@ -43,10 +41,6 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): fd.write(chunk) fd.flush() pbar.update(len(chunk)) - # print completed progress bar for large file sizes - pbar.n = total or pbar.n - if pbar.n > LARGE_FILE_SIZE: - Tqdm.write(str(pbar)) def exists(self, path_info): return bool(self._request("HEAD", path_info.url)) From 093ad20a84dcc83c49a61d241ad737df7544a8bd Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 16:06:18 +0100 Subject: [PATCH 52/66] flake8 unused imports --- dvc/remote/http.py | 1 - tests/func/test_checkout.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/dvc/remote/http.py b/dvc/remote/http.py index 5d24e99ade..07c0756ef7 100644 --- a/dvc/remote/http.py +++ b/dvc/remote/http.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals from dvc.scheme import Schemes -from dvc.utils import LARGE_FILE_SIZE from dvc.utils.compat import open import requests diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index 0bda6a5f72..d489a71b9a 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -1,6 +1,4 @@ import os -import sys -import re import shutil import filecmp import collections From 3098460f567707fad5d5d63015f60e9798e83d61 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 16:36:59 +0100 Subject: [PATCH 53/66] misc minor style fixes & optimisations --- .gitignore | 1 + dvc/remote/local/__init__.py | 16 ++++++++-------- dvc/remote/ssh/__init__.py | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 514e9ba8bb..d2a206fe3d 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ innosetup/config.ini *.exe .coverage +.coverage.* *.swp diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 0572e5945e..e0e0cb3d61 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -334,7 +334,8 @@ def status( return ret - def _fill_statuses(self, checksum_info_dir, local_exists, remote_exists): + @staticmethod + def _fill_statuses(checksum_info_dir, local_exists, remote_exists): # Using sets because they are way faster for lookups local = set(local_exists) remote = set(remote_exists) @@ -436,7 +437,8 @@ def pull(self, checksum_infos, remote, jobs=None, show_checksums=False): download=True, ) - def _log_missing_caches(self, checksum_info_dict): + @staticmethod + def _log_missing_caches(checksum_info_dict): missing_caches = [ (md5, info) for md5, info in checksum_info_dict.items() @@ -444,10 +446,8 @@ def _log_missing_caches(self, checksum_info_dict): ] if missing_caches: missing_desc = "".join( - [ - "\nname: {}, md5: {}".format(info["name"], md5) - for md5, info in missing_caches - ] + "\nname: {}, md5: {}".format(info["name"], md5) + for md5, info in missing_caches ) msg = ( "Some of the cache files do not exist neither locally " @@ -479,8 +479,8 @@ def _unprotect_file(path): os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE) def _unprotect_dir(self, path): - for path in walk_files(path, self.repo.dvcignore): - RemoteLOCAL._unprotect_file(path) + for fname in walk_files(path, self.repo.dvcignore): + RemoteLOCAL._unprotect_file(fname) def unprotect(self, path_info): path = path_info.fspath diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index e0b8f45e4a..826a685912 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -93,7 +93,7 @@ def ssh_config_filename(): @staticmethod def _load_user_ssh_config(hostname): user_config_file = RemoteSSH.ssh_config_filename() - user_ssh_config = dict() + user_ssh_config = {} if hostname and os.path.exists(user_config_file): ssh_config = paramiko.SSHConfig() with open(user_config_file) as f: From 16113c880ad6001d021de1c5563f486c8543d650 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 16:51:15 +0100 Subject: [PATCH 54/66] update to bleeding-edge StreamHandler.emit - https://github.com/iterative/dvc/pull/2333#discussion_r314950294 --- dvc/logger.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dvc/logger.py b/dvc/logger.py index 28fbff9110..f9fe77b8d1 100644 --- a/dvc/logger.py +++ b/dvc/logger.py @@ -154,11 +154,13 @@ def emit(self, record): """Write to Tqdm's stream so as to not break progressbars""" try: msg = self.format(record) - Tqdm.write(msg, file=self.stream) + Tqdm.write( + msg, file=self.stream, end=getattr(self, "terminator", "\n") + ) self.flush() - except (KeyboardInterrupt, SystemExit): + except RecursionError: raise - except: # noqa pylint: disable=bare-except + except Exception: self.handleError(record) From 49d4930c391fa640c848abb20ed83f58e0076d8f Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 22:05:48 +0100 Subject: [PATCH 55/66] add Tqdm units --- dvc/remote/azure.py | 8 ++++++-- dvc/remote/base.py | 2 +- dvc/remote/local/__init__.py | 6 ++++-- dvc/remote/ssh/__init__.py | 2 +- dvc/remote/ssh/connection.py | 3 ++- dvc/repo/checkout.py | 2 +- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/dvc/remote/azure.py b/dvc/remote/azure.py index c8dcd9777b..2527b66bad 100644 --- a/dvc/remote/azure.py +++ b/dvc/remote/azure.py @@ -115,7 +115,9 @@ def list_cache_paths(self): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: + with Tqdm( + desc_truncate=name, disable=no_progress_bar, bytes=True + ) as pbar: self.blob_service.create_blob_from_path( to_info.bucket, to_info.path, @@ -126,7 +128,9 @@ def _upload( def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): - with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: + with Tqdm( + desc_truncate=name, disable=no_progress_bar, bytes=True + ) as pbar: self.blob_service.get_blob_to_path( from_info.bucket, from_info.path, diff --git a/dvc/remote/base.py b/dvc/remote/base.py index b73e24b893..6790a9a650 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -632,7 +632,7 @@ def cache_exists(self, checksums, jobs=None): if not self.no_traverse: return list(set(checksums) & set(self.all())) - with Tqdm(total=len(checksums)) as pbar: + with Tqdm(total=len(checksums), unit="md5") as pbar: def exists_with_progress(path_info): ret = self.exists(path_info) diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index e0e0cb3d61..3c22cbda95 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -348,7 +348,9 @@ def _get_plans(self, download, remote, status_info, status): cache = [] path_infos = [] names = [] - for md5, info in Tqdm(status_info.items(), desc="Analysing status"): + for md5, info in Tqdm( + status_info.items(), desc="Analysing status", unit="file" + ): if info["status"] == status: cache.append(self.checksum_to_path_info(md5)) path_infos.append(remote.checksum_to_path_info(md5)) @@ -539,7 +541,7 @@ def _update_unpacked_dir(self, checksum): def _create_unpacked_dir(self, checksum, dir_info, unpacked_dir_info): self.makedirs(unpacked_dir_info) - for entry in Tqdm(dir_info, desc="Created unpacked dir"): + for entry in Tqdm(dir_info, desc="Creating unpacked dir", unit="dir"): entry_cache_info = self.checksum_to_path_info( entry[self.PARAM_CHECKSUM] ) diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index 826a685912..2e18c5f5e2 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -251,7 +251,7 @@ def cache_exists(self, checksums, jobs=None): if not self.no_traverse: return list(set(checksums) & set(self.all())) - with Tqdm(total=len(checksums)) as pbar: + with Tqdm(total=len(checksums), unit="md5") as pbar: def exists_with_progress(chunks): return self.batch_exists(chunks, callback=pbar.update_desc) diff --git a/dvc/remote/ssh/connection.py b/dvc/remote/ssh/connection.py index 7243b694f9..fbb0c2b7bf 100644 --- a/dvc/remote/ssh/connection.py +++ b/dvc/remote/ssh/connection.py @@ -171,6 +171,7 @@ def download(self, src, dest, no_progress_bar=False, progress_title=None): with Tqdm( desc_truncate=progress_title or os.path.basename(src), disable=no_progress_bar, + bytes=True, ) as pbar: self.sftp.get(src, dest, callback=pbar.update_to) @@ -185,7 +186,7 @@ def upload(self, src, dest, no_progress_bar=False, progress_title=None): progress_title = posixpath.basename(dest) with Tqdm( - desc_truncate=progress_title, disable=no_progress_bar + desc_truncate=progress_title, disable=no_progress_bar, bytes=True ) as pbar: self.sftp.put(src, tmp_file, callback=pbar.update_to) diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 0ddbfc4d7d..2b4d036aa5 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -37,7 +37,7 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): with self.state: _cleanup_unused_links(self, all_stages) - with Tqdm(total=get_all_files_numbers(stages)) as pbar: + with Tqdm(total=get_all_files_numbers(stages), unit="file") as pbar: for stage in stages: if stage.locked: logger.warning( From bd1fe67f99f36b99fd4a0c1d01913f71e073b0c3 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 17 Aug 2019 22:07:14 +0100 Subject: [PATCH 56/66] probably correct oss2 Tqdm units (sparse Mandarin documentation) --- dvc/remote/oss.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dvc/remote/oss.py b/dvc/remote/oss.py index 108c1bc495..57e900087f 100644 --- a/dvc/remote/oss.py +++ b/dvc/remote/oss.py @@ -107,7 +107,9 @@ def list_cache_paths(self): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: + with Tqdm( + desc_truncate=name, disable=no_progress_bar, bytes=True + ) as pbar: self.oss_service.put_object_from_file( to_info.path, from_file, progress_callback=pbar.update_to ) @@ -115,7 +117,9 @@ def _upload( def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): - with Tqdm(desc_truncate=name, disable=no_progress_bar) as pbar: + with Tqdm( + desc_truncate=name, disable=no_progress_bar, bytes=True + ) as pbar: self.oss_service.get_object_to_file( from_info.path, to_file, progress_callback=pbar.update_to ) From 34dc163329d83b3bf42f2dbe65a9db0cd3be9e80 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 18 Aug 2019 00:19:34 +0100 Subject: [PATCH 57/66] Tqdm auto leave=False for nested bars --- dvc/progress.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dvc/progress.py b/dvc/progress.py index e3fd74200f..04e075e3d8 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -39,6 +39,7 @@ def __init__( disable=None, bytes=False, # pylint: disable=W0622 desc_truncate=None, + leave=None, **kwargs ): """ @@ -63,6 +64,7 @@ def __init__( super(Tqdm, self).__init__( iterable=iterable, disable=disable, **kwargs ) + self.leave = self.pos == 0 if leave is None else leave def update_desc(self, desc, n=1, truncate=True): """ From 9a2dea86c834959d3b74bc33140efcc64b7a5d4c Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 18 Aug 2019 00:19:49 +0100 Subject: [PATCH 58/66] Tqdm longer descriptions --- dvc/progress.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/progress.py b/dvc/progress.py index 04e075e3d8..e74262233d 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -81,7 +81,7 @@ def update_to(self, current, total=None): self.update(current - self.n) @classmethod - def truncate(cls, s, max_len=10, end=True, fill="..."): + def truncate(cls, s, max_len=25, end=True, fill="..."): """ Guarantee len(output) < max_lenself. >>> truncate("hello", 4) From f90fd179b1895130d4b1f02c08c94f8046170bf1 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 18 Aug 2019 23:43:35 +0100 Subject: [PATCH 59/66] update tqdm version --- dvc/progress.py | 3 +-- setup.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dvc/progress.py b/dvc/progress.py index e74262233d..04d8d953d5 100644 --- a/dvc/progress.py +++ b/dvc/progress.py @@ -62,9 +62,8 @@ def __init__( >= logging.CRITICAL ) super(Tqdm, self).__init__( - iterable=iterable, disable=disable, **kwargs + iterable=iterable, disable=disable, leave=leave, **kwargs ) - self.leave = self.pos == 0 if leave is None else leave def update_desc(self, desc, n=1, truncate=True): """ diff --git a/setup.py b/setup.py index 7680ac8d63..e98ef6be8c 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ def run(self): "funcy>=1.12", "pathspec>=0.5.9", "shortuuid>=0.5.0", - "tqdm>=4.32.2", + "tqdm>=4.34.0", "win-unicode-console>=0.5; sys_platform == 'win32'", ] From 13e76de5e99a72375e78362d90b6f6c4cd765891 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 18 Aug 2019 23:53:04 +0100 Subject: [PATCH 60/66] suppress MD5 description prefix --- dvc/remote/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 6790a9a650..62dbba7051 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -636,7 +636,7 @@ def cache_exists(self, checksums, jobs=None): def exists_with_progress(path_info): ret = self.exists(path_info) - pbar.update_desc(str(path_info)) + pbar.update() return ret with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: From 6102caa3093e879ad7f5f23ad612c25c35e226d7 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 19 Aug 2019 00:27:38 +0100 Subject: [PATCH 61/66] hopefully temp fix for uncalled checkout progress_callback --- dvc/repo/checkout.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 2b4d036aa5..3430f94367 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -37,7 +37,10 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): with self.state: _cleanup_unused_links(self, all_stages) - with Tqdm(total=get_all_files_numbers(stages), unit="file") as pbar: + total = get_all_files_numbers(stages) + with Tqdm( + total=total, unit="file", desc="Checkout", disable=total == 0 + ) as pbar: for stage in stages: if stage.locked: logger.warning( @@ -47,4 +50,8 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): ) ) + done = pbar.n stage.checkout(force=force, progress_callback=pbar.update_desc) + if done == pbar.n: + # callback not called + pbar.update(stage.get_all_files_number()) From bee90646a71faa0de47c71c471dbd84f439d1169 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 19 Aug 2019 11:39:50 +0100 Subject: [PATCH 62/66] fix uncalled `progress_callback`s --- dvc/output/base.py | 1 + dvc/remote/base.py | 18 ++++++++++++++++-- dvc/remote/ssh/__init__.py | 2 ++ dvc/repo/checkout.py | 5 +---- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/dvc/output/base.py b/dvc/output/base.py index 8b55bd71f3..4932660413 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -277,6 +277,7 @@ def download(self, to): def checkout(self, force=False, progress_callback=None, tag=None): if not self.use_cache: + progress_callback(str(self.path_info), self.get_files_number()) return if tag: diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 62dbba7051..59d3d70ed0 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -739,23 +739,28 @@ def checkout( raise NotImplementedError checksum = checksum_info.get(self.PARAM_CHECKSUM) + skip = False if not checksum: logger.warning( "No checksum info found for '{}'. " "It won't be created.".format(str(path_info)) ) self.safe_remove(path_info, force=force) - return + skip = True if not self.changed(path_info, checksum_info): msg = "Data '{}' didn't change." logger.debug(msg.format(str(path_info))) - return + skip = True if self.changed_cache(checksum): msg = "Cache '{}' not found. File '{}' won't be created." logger.warning(msg.format(checksum, str(path_info))) self.safe_remove(path_info, force=force) + skip = True + + if skip: + progress_callback(str(path_info), self.get_files_number(checksum)) return msg = "Checking out '{}' with cache '{}'." @@ -774,6 +779,15 @@ def _checkout( path_info, checksum, force, progress_callback=progress_callback ) + def get_files_number(self, checksum): + if not checksum: + return 0 + + if self.is_dir_checksum(checksum): + return len(self.get_dir_cache(checksum)) + + return 1 + @staticmethod def unprotect(path_info): pass diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index ea8e899386..086a2fd717 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -277,3 +277,5 @@ def exists_with_progress(chunks): in_remote = itertools.chain.from_iterable(results) ret = list(itertools.compress(checksums, in_remote)) return ret + + pbar.update_desc("", 0) # clear path name description diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 3430f94367..e335763a2d 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -50,8 +50,5 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False): ) ) - done = pbar.n stage.checkout(force=force, progress_callback=pbar.update_desc) - if done == pbar.n: - # callback not called - pbar.update(stage.get_all_files_number()) + pbar.update_desc("Checkout", 0) # clear path name description From 8f7a7a55e6db9b7a011511cbb5e6c7037bbd39e4 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 19 Aug 2019 11:59:25 +0100 Subject: [PATCH 63/66] fix logic --- dvc/remote/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 59d3d70ed0..a4c24ed414 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -748,12 +748,12 @@ def checkout( self.safe_remove(path_info, force=force) skip = True - if not self.changed(path_info, checksum_info): + elif not self.changed(path_info, checksum_info): msg = "Data '{}' didn't change." logger.debug(msg.format(str(path_info))) skip = True - if self.changed_cache(checksum): + elif self.changed_cache(checksum): msg = "Cache '{}' not found. File '{}' won't be created." logger.warning(msg.format(checksum, str(path_info))) self.safe_remove(path_info, force=force) From 9b73ff72e9a6c0578126a2253b932e5f58dd66b5 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 19 Aug 2019 13:31:15 +0100 Subject: [PATCH 64/66] avoid duplication https://github.com/iterative/dvc/pull/2333#discussion_r315183224 --- dvc/output/base.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dvc/output/base.py b/dvc/output/base.py index 4932660413..48fd5bebbd 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -314,13 +314,10 @@ def move(self, out): self.repo.scm.ignore(self.fspath) def get_files_number(self): - if not self.use_cache or not self.checksum: + if not self.use_cache: return 0 - if self.is_dir_checksum: - return len(self.dir_cache) - - return 1 + return self.cache.get_files_number(self.checksum) def unprotect(self): if self.exists: From 8faa02e837df77dbe7b407ef05ef9b3f9eabbaae Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 20 Aug 2019 14:18:43 +0100 Subject: [PATCH 65/66] quick review fixes --- dvc/remote/base.py | 4 +--- dvc/remote/local/__init__.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index a4c24ed414..19031dac86 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -158,9 +158,7 @@ def _calculate_checksums(self, file_infos): ) ) tasks = Tqdm(tasks, total=len(file_infos), unit="md5") - checksums = { - file_infos[index]: task for index, task in enumerate(tasks) - } + checksums = dict(zip(file_infos, tasks)) return checksums def _collect_dir(self, path_info): diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 48dea00624..8381e35113 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -545,7 +545,7 @@ def _update_unpacked_dir(self, checksum): def _create_unpacked_dir(self, checksum, dir_info, unpacked_dir_info): self.makedirs(unpacked_dir_info) - for entry in Tqdm(dir_info, desc="Creating unpacked dir", unit="dir"): + for entry in Tqdm(dir_info, desc="Creating unpacked dir", unit="file"): entry_cache_info = self.checksum_to_path_info( entry[self.PARAM_CHECKSUM] ) From 4bd2ce25e1e8e76c232b6a87002b0783b577084e Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 20 Aug 2019 14:27:20 +0100 Subject: [PATCH 66/66] get_files_number test fix --- tests/unit/output/test_local.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unit/output/test_local.py b/tests/unit/output/test_local.py index 8149e83ffc..6e2af11323 100644 --- a/tests/unit/output/test_local.py +++ b/tests/unit/output/test_local.py @@ -2,6 +2,7 @@ from dvc.stage import Stage from dvc.output import OutputLOCAL +from dvc.remote.local import RemoteLOCAL from tests.basic_env import TestDvc @@ -32,8 +33,12 @@ def test_return_0_on_no_cache(self): self.assertEqual(0, o.get_files_number()) @patch.object(OutputLOCAL, "checksum", "12345678.dir") - @patch.object(OutputLOCAL, "dir_cache", [{"md5": "asdf"}, {"md5": "qwe"}]) - def test_return_mutiple_for_dir(self): + @patch.object( + RemoteLOCAL, + "get_dir_cache", + return_value=[{"md5": "asdf"}, {"md5": "qwe"}], + ) + def test_return_mutiple_for_dir(self, mock_get_dir_cache): o = self._get_output() self.assertEqual(2, o.get_files_number())