Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Set,
Tuple,
Union,
cast,
)

import gcsfs
Expand All @@ -37,14 +38,17 @@
PathType = Union[bytes, str]


GCP_PATH_PREFIX = "gs://"


class GCPArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for Google Cloud Storage based artifacts."""

_filesystem: Optional[gcsfs.GCSFileSystem] = None

# Class Configuration
FLAVOR: ClassVar[str] = GCP_ARTIFACT_STORE_FLAVOR
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"gs://"}
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {GCP_PATH_PREFIX}

@property
def filesystem(self) -> gcsfs.GCSFileSystem:
Expand Down Expand Up @@ -127,7 +131,10 @@ def glob(self, pattern: PathType) -> List[PathType]:
Returns:
A list of paths that match the given glob pattern.
"""
return self.filesystem.glob(path=pattern) # type: ignore[no-any-return]
return [
f"{GCP_PATH_PREFIX}{path}"
for path in self.filesystem.glob(path=pattern)
]

def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Expand All @@ -149,7 +156,27 @@ def listdir(self, path: PathType) -> List[PathType]:
Returns:
A list of paths of files in the directory.
"""
return self.filesystem.listdir(path=path) # type: ignore[no-any-return]
path_without_prefix = convert_to_str(path)
if path_without_prefix.startswith(GCP_PATH_PREFIX):
path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by GCP.

Args:
file_dict: A file info dict returned by the GCP filesystem.

Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path_without_prefix) :]
return base_name.lstrip("/")

return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]

def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
Expand Down Expand Up @@ -235,10 +262,15 @@ def walk(
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.

Returns:
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
return self.filesystem.walk(path=top) # type: ignore[no-any-return]
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files