From 760cc1b8f82e8553b27686b9900566186ef79f71 Mon Sep 17 00:00:00 2001 From: Guido Diepen Date: Tue, 7 Nov 2023 21:31:18 +0100 Subject: [PATCH 1/3] webhdfs: add support for webhdfs data_proxy --- dvc_webhdfs/__init__.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dvc_webhdfs/__init__.py b/dvc_webhdfs/__init__.py index dbcb237..354bc67 100644 --- a/dvc_webhdfs/__init__.py +++ b/dvc_webhdfs/__init__.py @@ -49,6 +49,17 @@ def _prepare_credentials(self, **config): def fs(self): from fsspec.implementations.webhdfs import WebHDFS + # If target data_proxy provided construct the source from host/port + if "data_proxy_target" in self.fs_args: + host = self.fs_args["host"] + port = self.fs_args["port"] + + protocol = "https" if self.fs_args.get("use_https") else "http" + + self.fs_args["data_proxy"] = { + f"{protocol}://{host}:{port}/webhdfs/v1":self.fs_args["data_proxy_target"] + } + fs = WebHDFS(**self.fs_args) fs.session.verify = self._ssl_verify return fs From 1506cdc7858d74f134e02c70360c5be3f2017a2d Mon Sep 17 00:00:00 2001 From: Guido Diepen Date: Wed, 8 Nov 2023 20:29:34 +0100 Subject: [PATCH 2/3] webhdfs: introduce additional temp variable --- dvc_webhdfs/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dvc_webhdfs/__init__.py b/dvc_webhdfs/__init__.py index 354bc67..0e68a87 100644 --- a/dvc_webhdfs/__init__.py +++ b/dvc_webhdfs/__init__.py @@ -56,8 +56,9 @@ def fs(self): protocol = "https" if self.fs_args.get("use_https") else "http" + source_url = f"{protocol}://{host}:{port}/webhdfs/v1" self.fs_args["data_proxy"] = { - f"{protocol}://{host}:{port}/webhdfs/v1":self.fs_args["data_proxy_target"] + source_url: self.fs_args["data_proxy_target"] } fs = WebHDFS(**self.fs_args) From f4a4fcc5560acef140c043b96cd045feed5ab476 Mon Sep 17 00:00:00 2001 From: Guido Diepen Date: Wed, 8 Nov 2023 22:33:16 +0100 Subject: [PATCH 3/3] webhdfs: move code to _prepare_credentials Addressing comment of @efiop --- dvc_webhdfs/__init__.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/dvc_webhdfs/__init__.py b/dvc_webhdfs/__init__.py index 0e68a87..f2ebd3a 100644 --- a/dvc_webhdfs/__init__.py +++ b/dvc_webhdfs/__init__.py @@ -42,6 +42,17 @@ def _prepare_credentials(self, **config): principal = config.pop("kerberos_principal", None) if principal: config["kerb_kwargs"] = {"principal": principal} + + # If target data_proxy provided construct the source from host/port + data_proxy_target = config.pop("data_proxy_target", None) + if data_proxy_target: + host = config["host"] + port = config["port"] + + protocol = "https" if config.get("use_https") else "http" + + source_url = f"{protocol}://{host}:{port}/webhdfs/v1" + config["data_proxy"] = {source_url: data_proxy_target} return config @wrap_prop(threading.Lock()) @@ -49,18 +60,6 @@ def _prepare_credentials(self, **config): def fs(self): from fsspec.implementations.webhdfs import WebHDFS - # If target data_proxy provided construct the source from host/port - if "data_proxy_target" in self.fs_args: - host = self.fs_args["host"] - port = self.fs_args["port"] - - protocol = "https" if self.fs_args.get("use_https") else "http" - - source_url = f"{protocol}://{host}:{port}/webhdfs/v1" - self.fs_args["data_proxy"] = { - source_url: self.fs_args["data_proxy_target"] - } - fs = WebHDFS(**self.fs_args) fs.session.verify = self._ssl_verify return fs