Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dvc/commands/live.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import argparse
from pathlib import Path

from dvc.cli.command import CmdBase
from dvc.cli.utils import fix_subparsers
Expand All @@ -16,6 +15,8 @@ def _run(self, target, revs=None):
metrics, plots = self.repo.live.show(target=target, revs=revs)

if plots:
from pathlib import Path

html_path = Path.cwd() / (self.args.target + "_html")

renderers = match_renderers(plots, self.repo.plots.templates)
Expand Down
49 changes: 49 additions & 0 deletions dvc/fs/_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import fsspec

from dvc.progress import Tqdm


class FsspecCallback(fsspec.Callback):
def __init__(self, progress_bar):
self.progress_bar = progress_bar
super().__init__()

def set_size(self, size):
if size is not None:
self.progress_bar.total = size
self.progress_bar.refresh()
super().set_size(size)

def relative_update(self, inc=1):
self.progress_bar.update(inc)
super().relative_update(inc)

def absolute_update(self, value):
self.progress_bar.update_to(value)
super().absolute_update(value)

@staticmethod
def wrap_fn(cb, fn):
def wrapped(*args, **kwargs):
res = fn(*args, **kwargs)
cb.relative_update()
return res

return wrapped


def tdqm_or_callback_wrapped(
fobj, method, total, callback=None, **pbar_kwargs
):
if callback:
from funcy import nullcontext
from tqdm.utils import CallbackIOWrapper

callback.set_size(total)
wrapper = CallbackIOWrapper(callback.relative_update, fobj, method)
return nullcontext(wrapper)

return Tqdm.wrapattr(fobj, method, total=total, bytes=True, **pbar_kwargs)


DEFAULT_CALLBACK = fsspec.callbacks.NoOpCallback()
2 changes: 1 addition & 1 deletion dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tqdm.utils import CallbackIOWrapper

from dvc.exceptions import DvcException
from dvc.progress import DEFAULT_CALLBACK, FsspecCallback
from dvc.fs._callback import DEFAULT_CALLBACK, FsspecCallback
from dvc.ui import ui
from dvc.utils import tmp_fname
from dvc.utils.fs import makedirs, move
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dvc.exceptions import OutputNotFoundError
from dvc.utils import relpath

from ..progress import DEFAULT_CALLBACK
from ._callback import DEFAULT_CALLBACK
from ._metadata import Metadata
from .base import FileSystem

Expand Down
3 changes: 1 addition & 2 deletions dvc/fs/fsspec_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from funcy import cached_property
from tqdm.utils import CallbackIOWrapper

from dvc.progress import DEFAULT_CALLBACK

from ._callback import DEFAULT_CALLBACK
from .base import FileSystem

FSPath = str
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from funcy import cached_property, memoize, wrap_with

from dvc import prompt
from dvc.progress import DEFAULT_CALLBACK
from dvc.scheme import Schemes

from ._callback import DEFAULT_CALLBACK
from .fsspec_wrapper import AnyFSPath, FSSpecWrapper, NoDirectoriesMixin


Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dvc.utils import is_exec, tmp_fname
from dvc.utils.fs import copy_fobj_to_file, copyfile, makedirs, move, remove

from ..progress import DEFAULT_CALLBACK
from ._callback import DEFAULT_CALLBACK
from .base import FileSystem

logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from funcy import lfilter, wrap_with

from ..progress import DEFAULT_CALLBACK
from ._callback import DEFAULT_CALLBACK
from .base import FileSystem
from .dvc import DvcFileSystem

Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

from funcy import cached_property, wrap_prop

from dvc.progress import DEFAULT_CALLBACK
from dvc.scheme import Schemes

from ._callback import DEFAULT_CALLBACK
from .fsspec_wrapper import ObjectFSWrapper

_AWS_CONFIG_PATH = os.path.join(os.path.expanduser("~"), ".aws", "config")
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dvc.scheme import Schemes
from dvc.utils.fs import as_atomic

from ..progress import DEFAULT_CALLBACK
from ._callback import DEFAULT_CALLBACK
from .fsspec_wrapper import FSSpecWrapper

_SSH_TIMEOUT = 60 * 30
Expand Down
49 changes: 2 additions & 47 deletions dvc/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
from threading import RLock

import fsspec
from tqdm import tqdm

from dvc.env import DVC_IGNORE_ISATTY
Expand Down Expand Up @@ -137,6 +136,8 @@ def wrapped(*args, **kwargs):
return wrapped

def as_callback(self):
from dvc.fs._callback import FsspecCallback

return FsspecCallback(self)

def close(self):
Expand Down Expand Up @@ -164,49 +165,3 @@ def format_dict(self):
d["ncols_desc"] = d["ncols_info"] = 1
d["prefix"] = ""
return d


class FsspecCallback(fsspec.Callback):
def __init__(self, progress_bar):
self.progress_bar = progress_bar
super().__init__()

def set_size(self, size):
if size is not None:
self.progress_bar.total = size
self.progress_bar.refresh()
super().set_size(size)

def relative_update(self, inc=1):
self.progress_bar.update(inc)
super().relative_update(inc)

def absolute_update(self, value):
self.progress_bar.update_to(value)
super().absolute_update(value)

@staticmethod
def wrap_fn(cb, fn):
def wrapped(*args, **kwargs):
res = fn(*args, **kwargs)
cb.relative_update()
return res

return wrapped


def tdqm_or_callback_wrapped(
fobj, method, total, callback=None, **pbar_kwargs
):
if callback:
from funcy import nullcontext
from tqdm.utils import CallbackIOWrapper

callback.set_size(total)
wrapper = CallbackIOWrapper(callback.relative_update, fobj, method)
return nullcontext(wrapper)

return Tqdm.wrapattr(fobj, method, total=total, bytes=True, **pbar_kwargs)


DEFAULT_CALLBACK = fsspec.callbacks.NoOpCallback()
2 changes: 1 addition & 1 deletion dvc/utils/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def copyfile(src, dest, callback=None, no_progress_bar=False, name=None):
try:
System.reflink(src, dest)
except OSError:
from dvc.progress import tdqm_or_callback_wrapped
from dvc.fs._callback import tdqm_or_callback_wrapped

with open(src, "rb") as fsrc, open(dest, "wb+") as fdest:
with tdqm_or_callback_wrapped(
Expand Down