diff --git a/src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py b/src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py index 408e20442d2..217cbc9f9c9 100644 --- a/src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py +++ b/src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py @@ -24,6 +24,7 @@ Set, Tuple, Union, + cast, ) import gcsfs @@ -37,6 +38,9 @@ PathType = Union[bytes, str] +GCP_PATH_PREFIX = "gs://" + + class GCPArtifactStore(BaseArtifactStore, AuthenticationMixin): """Artifact Store for Google Cloud Storage based artifacts.""" @@ -44,7 +48,7 @@ class GCPArtifactStore(BaseArtifactStore, AuthenticationMixin): # Class Configuration FLAVOR: ClassVar[str] = GCP_ARTIFACT_STORE_FLAVOR - SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"gs://"} + SUPPORTED_SCHEMES: ClassVar[Set[str]] = {GCP_PATH_PREFIX} @property def filesystem(self) -> gcsfs.GCSFileSystem: @@ -127,7 +131,10 @@ def glob(self, pattern: PathType) -> List[PathType]: Returns: A list of paths that match the given glob pattern. """ - return self.filesystem.glob(path=pattern) # type: ignore[no-any-return] + return [ + f"{GCP_PATH_PREFIX}{path}" + for path in self.filesystem.glob(path=pattern) + ] def isdir(self, path: PathType) -> bool: """Check whether a path is a directory. @@ -149,7 +156,27 @@ def listdir(self, path: PathType) -> List[PathType]: Returns: A list of paths of files in the directory. """ - return self.filesystem.listdir(path=path) # type: ignore[no-any-return] + path_without_prefix = convert_to_str(path) + if path_without_prefix.startswith(GCP_PATH_PREFIX): + path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :] + + def _extract_basename(file_dict: Dict[str, Any]) -> str: + """Extracts the basename from a file info dict returned by GCP. + + Args: + file_dict: A file info dict returned by the GCP filesystem. + + Returns: + The basename of the file. + """ + file_path = cast(str, file_dict["name"]) + base_name = file_path[len(path_without_prefix) :] + return base_name.lstrip("/") + + return [ + _extract_basename(dict_) + for dict_ in self.filesystem.listdir(path=path) + ] def makedirs(self, path: PathType) -> None: """Create a directory at the given path. @@ -235,10 +262,15 @@ def walk( topdown: Unused argument to conform to interface. onerror: Unused argument to conform to interface. - Returns: + Yields: An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. """ # TODO [ENG-153]: Additional params - return self.filesystem.walk(path=top) # type: ignore[no-any-return] + for ( + directory, + subdirectories, + files, + ) in self.filesystem.walk(path=top): + yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files