From a4fcb842dd4b5bf98b7afc5042d8ec22326ac5f5 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 19 Jul 2020 13:56:12 +0100 Subject: [PATCH 01/11] initial schema augmentation --- dvc/output/__init__.py | 1 + dvc/output/base.py | 1 + dvc/schema.py | 16 ++++++++++++++-- dvc/tree/local.py | 1 + 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/dvc/output/__init__.py b/dvc/output/__init__.py index 7cfdf9e0fb..f48be196ff 100644 --- a/dvc/output/__init__.py +++ b/dvc/output/__init__.py @@ -55,6 +55,7 @@ SCHEMA = CHECKSUMS_SCHEMA.copy() SCHEMA[Required(BaseOutput.PARAM_PATH)] = str +SCHEMA[BaseOutput.PARAM_FILTER] = str SCHEMA[BaseOutput.PARAM_CACHE] = bool SCHEMA[BaseOutput.PARAM_METRIC] = BaseOutput.METRIC_SCHEMA SCHEMA[BaseOutput.PARAM_PLOT] = bool diff --git a/dvc/output/base.py b/dvc/output/base.py index f17e3df9e5..1f5c29d975 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -52,6 +52,7 @@ class BaseOutput: PARAM_PATH = "path" PARAM_CACHE = "cache" + PARAM_FILTER = "udf" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" PARAM_METRIC_XPATH = "xpath" diff --git a/dvc/schema.py b/dvc/schema.py index 0948f1f613..0b4bbd651e 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -17,7 +17,11 @@ StageParams.PARAM_ALWAYS_CHANGED: bool, } -DATA_SCHEMA = {**CHECKSUMS_SCHEMA, Required("path"): str} +DATA_SCHEMA = { + **CHECKSUMS_SCHEMA, + Required(BaseOutput.PARAM_PATH): str, + Optional(BaseOutput.PARAM_FILTER): str, +} LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, StageParams.PARAM_DEPS: [DATA_SCHEMA], @@ -51,7 +55,15 @@ str: { StageParams.PARAM_CMD: str, Optional(StageParams.PARAM_WDIR): str, - Optional(StageParams.PARAM_DEPS): [str], + Optional(StageParams.PARAM_DEPS): [ + Any( + str, + { + Required(BaseOutput.PARAM_PATH): str, + Optional(BaseOutput.PARAM_FILTER): str, + }, + ) + ], Optional(StageParams.PARAM_PARAMS): [ Any(str, PARAM_PSTAGE_NON_DEFAULT_SCHEMA) ], diff --git a/dvc/tree/local.py b/dvc/tree/local.py index bfce3345f8..f04cb17b06 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -30,6 +30,7 @@ class LocalTree(BaseTree): PATH_CLS = PathInfo PARAM_CHECKSUM = "md5" PARAM_PATH = "path" + PARAM_FILTER = "udf" TRAVERSE_PREFIX_LEN = 2 UNPACKED_DIR_SUFFIX = ".unpacked" From f1f7d54e62d7b2253fbee1c10fc967691f244722 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 19 Jul 2020 20:15:29 +0100 Subject: [PATCH 02/11] more schema updates --- dvc/dependency/__init__.py | 3 +++ dvc/output/__init__.py | 1 + dvc/output/base.py | 6 ++++++ dvc/schema.py | 1 + dvc/tree/local.py | 1 + 5 files changed, 12 insertions(+) diff --git a/dvc/dependency/__init__.py b/dvc/dependency/__init__.py index 91bd96c232..e189b6dad7 100644 --- a/dvc/dependency/__init__.py +++ b/dvc/dependency/__init__.py @@ -52,6 +52,9 @@ def _get(stage, p, info): + if isinstance(p, dict): + info.update(p) + p = info.pop("path", None) parsed = urlparse(p) if p else None if parsed and parsed.scheme == "remote": tree = get_cloud_tree(stage.repo, name=parsed.netloc) diff --git a/dvc/output/__init__.py b/dvc/output/__init__.py index f48be196ff..2f98389c66 100644 --- a/dvc/output/__init__.py +++ b/dvc/output/__init__.py @@ -56,6 +56,7 @@ SCHEMA = CHECKSUMS_SCHEMA.copy() SCHEMA[Required(BaseOutput.PARAM_PATH)] = str SCHEMA[BaseOutput.PARAM_FILTER] = str +SCHEMA[BaseOutput.PARAM_FILTER_CHECKSUM] = CHECKSUM_SCHEMA SCHEMA[BaseOutput.PARAM_CACHE] = bool SCHEMA[BaseOutput.PARAM_METRIC] = BaseOutput.METRIC_SCHEMA SCHEMA[BaseOutput.PARAM_PLOT] = bool diff --git a/dvc/output/base.py b/dvc/output/base.py index 1f5c29d975..742ede1e5c 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -53,6 +53,7 @@ class BaseOutput: PARAM_PATH = "path" PARAM_CACHE = "cache" PARAM_FILTER = "udf" + PARAM_FILTER_CHECKSUM = "udf_md5" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" PARAM_METRIC_XPATH = "xpath" @@ -177,6 +178,11 @@ def checksum(self): def checksum(self, checksum): self.info[self.tree.PARAM_CHECKSUM] = checksum + @property + def udf(self): + logger.warning("udf:%s info=%s", self.path_info, self.info) + return self.info.get(self.PARAM_FILTER) + def get_checksum(self): return self.tree.get_hash(self.path_info) diff --git a/dvc/schema.py b/dvc/schema.py index 0b4bbd651e..efabebfe44 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -21,6 +21,7 @@ **CHECKSUMS_SCHEMA, Required(BaseOutput.PARAM_PATH): str, Optional(BaseOutput.PARAM_FILTER): str, + Optional(BaseOutput.PARAM_FILTER_CHECKSUM): str, } LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, diff --git a/dvc/tree/local.py b/dvc/tree/local.py index f04cb17b06..278494108e 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -31,6 +31,7 @@ class LocalTree(BaseTree): PARAM_CHECKSUM = "md5" PARAM_PATH = "path" PARAM_FILTER = "udf" + PARAM_FILTER_CHECKSUM = "udf_md5" TRAVERSE_PREFIX_LEN = 2 UNPACKED_DIR_SUFFIX = ".unpacked" From ef27b350004b9c2a16c4f5ebd8b7e130afa147a8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 15:32:08 +0100 Subject: [PATCH 03/11] schema update --- dvc/output/__init__.py | 1 - dvc/output/base.py | 3 +-- dvc/schema.py | 9 +-------- dvc/tree/local.py | 3 +-- 4 files changed, 3 insertions(+), 13 deletions(-) diff --git a/dvc/output/__init__.py b/dvc/output/__init__.py index 2f98389c66..f48be196ff 100644 --- a/dvc/output/__init__.py +++ b/dvc/output/__init__.py @@ -56,7 +56,6 @@ SCHEMA = CHECKSUMS_SCHEMA.copy() SCHEMA[Required(BaseOutput.PARAM_PATH)] = str SCHEMA[BaseOutput.PARAM_FILTER] = str -SCHEMA[BaseOutput.PARAM_FILTER_CHECKSUM] = CHECKSUM_SCHEMA SCHEMA[BaseOutput.PARAM_CACHE] = bool SCHEMA[BaseOutput.PARAM_METRIC] = BaseOutput.METRIC_SCHEMA SCHEMA[BaseOutput.PARAM_PLOT] = bool diff --git a/dvc/output/base.py b/dvc/output/base.py index 742ede1e5c..ccad9b303b 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -52,8 +52,7 @@ class BaseOutput: PARAM_PATH = "path" PARAM_CACHE = "cache" - PARAM_FILTER = "udf" - PARAM_FILTER_CHECKSUM = "udf_md5" + PARAM_FILTER = "cmd" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" PARAM_METRIC_XPATH = "xpath" diff --git a/dvc/schema.py b/dvc/schema.py index efabebfe44..869023ed43 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -21,7 +21,6 @@ **CHECKSUMS_SCHEMA, Required(BaseOutput.PARAM_PATH): str, Optional(BaseOutput.PARAM_FILTER): str, - Optional(BaseOutput.PARAM_FILTER_CHECKSUM): str, } LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, @@ -57,13 +56,7 @@ StageParams.PARAM_CMD: str, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [ - Any( - str, - { - Required(BaseOutput.PARAM_PATH): str, - Optional(BaseOutput.PARAM_FILTER): str, - }, - ) + Any(str, {str: {BaseOutput.PARAM_FILTER: str}}) ], Optional(StageParams.PARAM_PARAMS): [ Any(str, PARAM_PSTAGE_NON_DEFAULT_SCHEMA) diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 278494108e..46283a8bd1 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -30,8 +30,7 @@ class LocalTree(BaseTree): PATH_CLS = PathInfo PARAM_CHECKSUM = "md5" PARAM_PATH = "path" - PARAM_FILTER = "udf" - PARAM_FILTER_CHECKSUM = "udf_md5" + PARAM_FILTER = "cmd" TRAVERSE_PREFIX_LEN = 2 UNPACKED_DIR_SUFFIX = ".unpacked" From b8bb670ad2557f4b30741a387c517ebb1fb84d25 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 19:00:01 +0100 Subject: [PATCH 04/11] fix schema --- dvc/dependency/__init__.py | 8 ++++++-- dvc/dependency/base.py | 1 + dvc/output/__init__.py | 1 - dvc/output/base.py | 1 - dvc/schema.py | 13 ++++++------- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/dvc/dependency/__init__.py b/dvc/dependency/__init__.py index e189b6dad7..c8f6ff5c21 100644 --- a/dvc/dependency/__init__.py +++ b/dvc/dependency/__init__.py @@ -3,6 +3,7 @@ import dvc.output as output from dvc.dependency.azure import AzureDependency +from dvc.dependency.base import BaseDependency from dvc.dependency.gs import GSDependency from dvc.dependency.hdfs import HDFSDependency from dvc.dependency.http import HTTPDependency @@ -49,12 +50,15 @@ del SCHEMA[BaseOutput.PARAM_METRIC] SCHEMA.update(RepoDependency.REPO_SCHEMA) SCHEMA.update(ParamsDependency.PARAM_SCHEMA) +SCHEMA.update({BaseDependency.PARAM_FILTER: str}) def _get(stage, p, info): if isinstance(p, dict): - info.update(p) - p = info.pop("path", None) + p = list(p.items()) + assert len(p) == 1 + p, extra_info = p[0] # PARAM_FILTER + info.update(extra_info) parsed = urlparse(p) if p else None if parsed and parsed.scheme == "remote": tree = get_cloud_tree(stage.repo, name=parsed.netloc) diff --git a/dvc/dependency/base.py b/dvc/dependency/base.py index a78c9e389c..8bd2d0511e 100644 --- a/dvc/dependency/base.py +++ b/dvc/dependency/base.py @@ -20,6 +20,7 @@ def __init__(self, path): class BaseDependency: IS_DEPENDENCY = True + PARAM_FILTER = "cmd" DoesNotExistError = DependencyDoesNotExistError IsNotFileOrDirError = DependencyIsNotFileOrDirError diff --git a/dvc/output/__init__.py b/dvc/output/__init__.py index f48be196ff..7cfdf9e0fb 100644 --- a/dvc/output/__init__.py +++ b/dvc/output/__init__.py @@ -55,7 +55,6 @@ SCHEMA = CHECKSUMS_SCHEMA.copy() SCHEMA[Required(BaseOutput.PARAM_PATH)] = str -SCHEMA[BaseOutput.PARAM_FILTER] = str SCHEMA[BaseOutput.PARAM_CACHE] = bool SCHEMA[BaseOutput.PARAM_METRIC] = BaseOutput.METRIC_SCHEMA SCHEMA[BaseOutput.PARAM_PLOT] = bool diff --git a/dvc/output/base.py b/dvc/output/base.py index ccad9b303b..fa38b249c3 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -52,7 +52,6 @@ class BaseOutput: PARAM_PATH = "path" PARAM_CACHE = "cache" - PARAM_FILTER = "cmd" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" PARAM_METRIC_XPATH = "xpath" diff --git a/dvc/schema.py b/dvc/schema.py index 869023ed43..b60a6b9b77 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -1,6 +1,7 @@ from voluptuous import Any, Optional, Required, Schema from dvc import dependency, output +from dvc.dependency.base import BaseDependency from dvc.output import CHECKSUMS_SCHEMA, BaseOutput from dvc.stage.params import StageParams @@ -17,14 +18,12 @@ StageParams.PARAM_ALWAYS_CHANGED: bool, } -DATA_SCHEMA = { - **CHECKSUMS_SCHEMA, - Required(BaseOutput.PARAM_PATH): str, - Optional(BaseOutput.PARAM_FILTER): str, -} +DATA_SCHEMA = {**CHECKSUMS_SCHEMA, Required(BaseOutput.PARAM_PATH): str} LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, - StageParams.PARAM_DEPS: [DATA_SCHEMA], + StageParams.PARAM_DEPS: [ + {**DATA_SCHEMA, Optional(BaseDependency.PARAM_FILTER): str} + ], StageParams.PARAM_PARAMS: {str: {str: object}}, StageParams.PARAM_OUTS: [DATA_SCHEMA], } @@ -56,7 +55,7 @@ StageParams.PARAM_CMD: str, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [ - Any(str, {str: {BaseOutput.PARAM_FILTER: str}}) + Any(str, {str: {BaseDependency.PARAM_FILTER: str}}) ], Optional(StageParams.PARAM_PARAMS): [ Any(str, PARAM_PSTAGE_NON_DEFAULT_SCHEMA) From 6269f4480d9628c93cf849224dc521d35290c2eb Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 21:28:12 +0100 Subject: [PATCH 05/11] working PoC TODO: insert cmd into dvc.lock --- dvc/dependency/__init__.py | 3 +-- dvc/dependency/base.py | 1 - dvc/output/base.py | 6 +++--- dvc/schema.py | 5 ++--- dvc/tree/base.py | 6 +++--- dvc/tree/local.py | 5 ++--- dvc/utils/__init__.py | 21 ++++++++++++++++++++- 7 files changed, 31 insertions(+), 16 deletions(-) diff --git a/dvc/dependency/__init__.py b/dvc/dependency/__init__.py index c8f6ff5c21..803deccfbe 100644 --- a/dvc/dependency/__init__.py +++ b/dvc/dependency/__init__.py @@ -3,7 +3,6 @@ import dvc.output as output from dvc.dependency.azure import AzureDependency -from dvc.dependency.base import BaseDependency from dvc.dependency.gs import GSDependency from dvc.dependency.hdfs import HDFSDependency from dvc.dependency.http import HTTPDependency @@ -50,7 +49,7 @@ del SCHEMA[BaseOutput.PARAM_METRIC] SCHEMA.update(RepoDependency.REPO_SCHEMA) SCHEMA.update(ParamsDependency.PARAM_SCHEMA) -SCHEMA.update({BaseDependency.PARAM_FILTER: str}) +SCHEMA.update({BaseOutput.PARAM_FILTER: str}) def _get(stage, p, info): diff --git a/dvc/dependency/base.py b/dvc/dependency/base.py index 8bd2d0511e..a78c9e389c 100644 --- a/dvc/dependency/base.py +++ b/dvc/dependency/base.py @@ -20,7 +20,6 @@ def __init__(self, path): class BaseDependency: IS_DEPENDENCY = True - PARAM_FILTER = "cmd" DoesNotExistError = DependencyDoesNotExistError IsNotFileOrDirError = DependencyIsNotFileOrDirError diff --git a/dvc/output/base.py b/dvc/output/base.py index fa38b249c3..f57ba428d3 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -52,6 +52,7 @@ class BaseOutput: PARAM_PATH = "path" PARAM_CACHE = "cache" + PARAM_FILTER = "cmd" PARAM_METRIC = "metric" PARAM_METRIC_TYPE = "type" PARAM_METRIC_XPATH = "xpath" @@ -177,8 +178,7 @@ def checksum(self, checksum): self.info[self.tree.PARAM_CHECKSUM] = checksum @property - def udf(self): - logger.warning("udf:%s info=%s", self.path_info, self.info) + def filter_cmd(self): return self.info.get(self.PARAM_FILTER) def get_checksum(self): @@ -193,7 +193,7 @@ def exists(self): return self.tree.exists(self.path_info) def save_info(self): - return self.tree.save_info(self.path_info) + return self.tree.save_info(self.path_info, cmd=self.filter_cmd) def changed_checksum(self): return self.checksum != self.get_checksum() diff --git a/dvc/schema.py b/dvc/schema.py index b60a6b9b77..3005e1c375 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -1,7 +1,6 @@ from voluptuous import Any, Optional, Required, Schema from dvc import dependency, output -from dvc.dependency.base import BaseDependency from dvc.output import CHECKSUMS_SCHEMA, BaseOutput from dvc.stage.params import StageParams @@ -22,7 +21,7 @@ LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, StageParams.PARAM_DEPS: [ - {**DATA_SCHEMA, Optional(BaseDependency.PARAM_FILTER): str} + {**DATA_SCHEMA, Optional(BaseOutput.PARAM_FILTER): str} ], StageParams.PARAM_PARAMS: {str: {str: object}}, StageParams.PARAM_OUTS: [DATA_SCHEMA], @@ -55,7 +54,7 @@ StageParams.PARAM_CMD: str, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [ - Any(str, {str: {BaseDependency.PARAM_FILTER: str}}) + Any(str, {str: {BaseOutput.PARAM_FILTER: str}}) ], Optional(StageParams.PARAM_PARAMS): [ Any(str, PARAM_PSTAGE_NON_DEFAULT_SCHEMA) diff --git a/dvc/tree/base.py b/dvc/tree/base.py index 1860bafab6..610347a11b 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -236,7 +236,7 @@ def is_dir_hash(cls, hash_): return False return hash_.endswith(cls.CHECKSUM_DIR_SUFFIX) - def get_hash(self, path_info, **kwargs): + def get_hash(self, path_info, cmd=None, **kwargs): assert path_info and ( isinstance(path_info, str) or path_info.scheme == self.scheme ) @@ -265,14 +265,14 @@ def get_hash(self, path_info, **kwargs): if self.isdir(path_info): hash_ = self.get_dir_hash(path_info, **kwargs) else: - hash_ = self.get_file_hash(path_info) + hash_ = self.get_file_hash(path_info, cmd=cmd) if hash_ and self.exists(path_info): self.state.save(path_info, hash_) return hash_ - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): raise NotImplementedError def get_dir_hash(self, path_info, **kwargs): diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 46283a8bd1..6506c9b4b3 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -30,7 +30,6 @@ class LocalTree(BaseTree): PATH_CLS = PathInfo PARAM_CHECKSUM = "md5" PARAM_PATH = "path" - PARAM_FILTER = "cmd" TRAVERSE_PREFIX_LEN = 2 UNPACKED_DIR_SUFFIX = ".unpacked" @@ -298,8 +297,8 @@ def is_protected(self, path_info): return stat.S_IMODE(mode) == self.CACHE_MODE - def get_file_hash(self, path_info): - return file_md5(path_info)[0] + def get_file_hash(self, path_info, cmd=None): + return file_md5(path_info, cmd=cmd)[0] @staticmethod def getsize(path_info): diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 8fece228ff..2bababbe12 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -7,7 +7,9 @@ import math import os import re +import subprocess import sys +import tempfile import time import colorama @@ -43,7 +45,7 @@ def _fobj_md5(fobj, hash_md5, binary, progress_func=None): progress_func(len(data)) -def file_md5(fname, tree=None): +def file_md5(fname, tree=None, cmd=None): """ get the (md5 hexdigest, md5 digest) of a file """ from dvc.progress import Tqdm from dvc.istextfile import istextfile @@ -58,6 +60,19 @@ def file_md5(fname, tree=None): open_func = open if exists_func(fname): + filtered = None + if cmd: + p = subprocess.Popen( + cmd.split() + [fname], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + out, err = p.communicate() + if p.returncode != 0: + raise RuntimeError(err) + with tempfile.NamedTemporaryFile(delete=False) as fobj: + fobj.write(out) + fname = filtered = fobj.name hash_md5 = hashlib.md5() binary = not istextfile(fname, tree=tree) size = stat_func(fname).st_size @@ -80,6 +95,10 @@ def file_md5(fname, tree=None): with open_func(fname, "rb") as fobj: _fobj_md5(fobj, hash_md5, binary, pbar.update) + if filtered is not None: + from dvc.utils.fs import remove + + remove(filtered) return (hash_md5.hexdigest(), hash_md5.digest()) return (None, None) From dae2d46251ebd19fc563a10cdfb82a6b9d4ffa5d Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 21:37:30 +0100 Subject: [PATCH 06/11] minor logging tidy --- dvc/utils/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 2bababbe12..4b02710133 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -45,7 +45,7 @@ def _fobj_md5(fobj, hash_md5, binary, progress_func=None): progress_func(len(data)) -def file_md5(fname, tree=None, cmd=None): +def file_md5(fname, tree=None, cmd="cat"): """ get the (md5 hexdigest, md5 digest) of a file """ from dvc.progress import Tqdm from dvc.istextfile import istextfile @@ -61,7 +61,7 @@ def file_md5(fname, tree=None, cmd=None): if exists_func(fname): filtered = None - if cmd: + if cmd != "cat": p = subprocess.Popen( cmd.split() + [fname], stdout=subprocess.PIPE, @@ -69,8 +69,10 @@ def file_md5(fname, tree=None, cmd=None): ) out, err = p.communicate() if p.returncode != 0: + logger.error("filtering:%s %s", cmd, fname) raise RuntimeError(err) with tempfile.NamedTemporaryFile(delete=False) as fobj: + logger.debug("filtering:%s %s > %s", cmd, fname, fobj.name) fobj.write(out) fname = filtered = fobj.name hash_md5 = hashlib.md5() From fc2d712912948ccefe37afd5fdfdba5516a06122 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 21:48:49 +0100 Subject: [PATCH 07/11] update all get_file_hash --- dvc/repo/tree.py | 7 ++++--- dvc/tree/azure.py | 3 ++- dvc/tree/gdrive.py | 2 +- dvc/tree/gs.py | 3 ++- dvc/tree/hdfs.py | 3 ++- dvc/tree/http.py | 3 ++- dvc/tree/s3.py | 3 ++- dvc/tree/ssh/__init__.py | 3 ++- 8 files changed, 17 insertions(+), 10 deletions(-) diff --git a/dvc/repo/tree.py b/dvc/repo/tree.py index 8011d0318c..1f30e5024a 100644 --- a/dvc/repo/tree.py +++ b/dvc/repo/tree.py @@ -226,7 +226,8 @@ def isdvc(self, path, **kwargs): def isexec(self, path): # pylint: disable=unused-argument return False - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError outs = self._find_outs(path_info, strict=False) if len(outs) != 1: raise OutputNotFoundError @@ -404,7 +405,7 @@ def walk_files(self, top, **kwargs): # pylint: disable=arguments-differ for fname in files: yield PathInfo(root) / fname - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): """Return file checksum for specified path. If path_info is a DVC out, the pre-computed checksum for the file @@ -418,7 +419,7 @@ def get_file_hash(self, path_info): return self.dvctree.get_file_hash(path_info) except OutputNotFoundError: pass - return file_md5(path_info, self)[0] + return file_md5(path_info, self, cmd=cmd)[0] def copytree(self, top, dest): top = PathInfo(top) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index 19f9ecdcb9..1829cff04b 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -139,7 +139,8 @@ def remove(self, path_info): logger.debug(f"Removing {path_info}") self.blob_service.delete_blob(path_info.bucket, path_info.path) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError return self.get_etag(path_info) def _upload( diff --git a/dvc/tree/gdrive.py b/dvc/tree/gdrive.py index abf48d17ab..799663adad 100644 --- a/dvc/tree/gdrive.py +++ b/dvc/tree/gdrive.py @@ -573,7 +573,7 @@ def remove(self, path_info): item_id = self._get_item_id(path_info) self.gdrive_delete_file(item_id) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): raise NotImplementedError def _upload(self, from_file, to_info, name=None, no_progress_bar=False): diff --git a/dvc/tree/gs.py b/dvc/tree/gs.py index 767154de35..df4473abab 100644 --- a/dvc/tree/gs.py +++ b/dvc/tree/gs.py @@ -182,7 +182,8 @@ def copy(self, from_info, to_info): to_bucket = self.gs.bucket(to_info.bucket) from_bucket.copy_blob(blob, to_bucket, new_name=to_info.path) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError import base64 import codecs diff --git a/dvc/tree/hdfs.py b/dvc/tree/hdfs.py index cdc9a2c339..d064718eb8 100644 --- a/dvc/tree/hdfs.py +++ b/dvc/tree/hdfs.py @@ -161,7 +161,8 @@ def _group(regex, s, gname): assert match is not None return match.group(gname) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError # NOTE: pyarrow doesn't support checksum, so we need to use hadoop regex = r".*\t.*\t(?P.*)" stdout = self.hadoop_fs( diff --git a/dvc/tree/http.py b/dvc/tree/http.py index abed95ca4f..b62042fe3c 100644 --- a/dvc/tree/http.py +++ b/dvc/tree/http.py @@ -125,7 +125,8 @@ def request(self, method, url, **kwargs): def exists(self, path_info, use_dvcignore=True): return bool(self.request("HEAD", path_info.url)) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError url = path_info.url headers = self.request("HEAD", url).headers etag = headers.get("ETag") or headers.get("Content-MD5") diff --git a/dvc/tree/s3.py b/dvc/tree/s3.py index 44368d7461..7a4a45c7c5 100644 --- a/dvc/tree/s3.py +++ b/dvc/tree/s3.py @@ -317,7 +317,8 @@ def _copy(cls, s3, from_info, to_info, extra_args): if etag != cached_etag: raise ETagMismatchError(etag, cached_etag) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError return self.get_etag(self.s3, path_info.bucket, path_info.path) def _upload(self, from_file, to_info, name=None, no_progress_bar=False): diff --git a/dvc/tree/ssh/__init__.py b/dvc/tree/ssh/__init__.py index 26bee19388..20560f8e6f 100644 --- a/dvc/tree/ssh/__init__.py +++ b/dvc/tree/ssh/__init__.py @@ -233,7 +233,8 @@ def reflink(self, from_info, to_info): with self.ssh(from_info) as ssh: ssh.reflink(from_info.path, to_info.path) - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError if path_info.scheme != self.scheme: raise NotImplementedError From c67d0b86dabef8e6c82941083c8da2db6f146d82 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 21:55:38 +0100 Subject: [PATCH 08/11] fix silly `cat` issue --- dvc/utils/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 4b02710133..15966ba0ad 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -45,8 +45,10 @@ def _fobj_md5(fobj, hash_md5, binary, progress_func=None): progress_func(len(data)) -def file_md5(fname, tree=None, cmd="cat"): - """ get the (md5 hexdigest, md5 digest) of a file """ +def file_md5(fname, tree=None, cmd=None): + """ + Returns (md5_hexdigest, md5_digest) of `cmd file` (default: `cmd=cat`) + """ from dvc.progress import Tqdm from dvc.istextfile import istextfile @@ -61,7 +63,7 @@ def file_md5(fname, tree=None, cmd="cat"): if exists_func(fname): filtered = None - if cmd != "cat": + if cmd: p = subprocess.Popen( cmd.split() + [fname], stdout=subprocess.PIPE, From f16b9ae83b71f3f2a1a895c6bd0b12a0ee5731d7 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 22:37:22 +0100 Subject: [PATCH 09/11] attempt at adding to lockfile --- dvc/output/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dvc/output/base.py b/dvc/output/base.py index f57ba428d3..d5f303c090 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -318,6 +318,8 @@ def dumpd(self): if self.persist: ret[self.PARAM_PERSIST] = self.persist + if self.filter_cmd: + ret[self.PARAM_FILTER] = self.filter_cmd return ret def verify_metric(self): From e58ae3746e53bfa9e7272a5c08101755cd3fd5b8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sat, 8 Aug 2020 23:22:31 +0100 Subject: [PATCH 10/11] filter in save_info --- dvc/tree/base.py | 8 ++++++-- dvc/tree/local.py | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/dvc/tree/base.py b/dvc/tree/base.py index 610347a11b..828a379411 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -63,6 +63,7 @@ class BaseTree: CACHE_MODE = None SHARED_MODE_MAP = {None: (None, None), "group": (None, None)} PARAM_CHECKSUM = None + PARAM_FILTER = None state = StateNoop() @@ -293,8 +294,11 @@ def path_to_hash(self, path): return "".join(parts) - def save_info(self, path_info, **kwargs): - return {self.PARAM_CHECKSUM: self.get_hash(path_info, **kwargs)} + def save_info(self, path_info, cmd=None, **kwargs): + ret = {self.PARAM_CHECKSUM: self.get_hash(path_info, **kwargs)} + if cmd: + ret[self.PARAM_FILTER] = cmd + return ret def _calculate_hashes(self, file_infos): file_infos = list(file_infos) diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 6506c9b4b3..f686e5f437 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -29,6 +29,7 @@ class LocalTree(BaseTree): scheme = Schemes.LOCAL PATH_CLS = PathInfo PARAM_CHECKSUM = "md5" + PARAM_FILTER = "cmd" PARAM_PATH = "path" TRAVERSE_PREFIX_LEN = 2 UNPACKED_DIR_SUFFIX = ".unpacked" From 33b4e286f5d9b40b231eba364a3804214909f3bd Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 9 Aug 2020 00:07:33 +0100 Subject: [PATCH 11/11] webdav: get_file_hash(cmd) --- dvc/tree/webdav.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dvc/tree/webdav.py b/dvc/tree/webdav.py index 115de31767..282851707e 100644 --- a/dvc/tree/webdav.py +++ b/dvc/tree/webdav.py @@ -131,7 +131,8 @@ def exists(self, path_info, use_dvcignore=True): return self._client.check(path_info.path) # Gets file hash 'etag' - def get_file_hash(self, path_info): + def get_file_hash(self, path_info, cmd=None): + assert not cmd, NotImplementedError # Use webdav client info method to get etag etag = self._client.info(path_info.path)["etag"].strip('"')