diff --git a/docs/api/hub.polarisfs.md b/docs/api/hub.polarisfs.md deleted file mode 100644 index 5c372dc0..00000000 --- a/docs/api/hub.polarisfs.md +++ /dev/null @@ -1,5 +0,0 @@ -::: polaris.hub.polarisfs.PolarisFileSystem - options: - merge_init_into_class: true - filters: ["!^_"] ---- diff --git a/mkdocs.yml b/mkdocs.yml index 7a74a5fe..d7bb02fd 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -37,7 +37,6 @@ nav: - Hub: - Client: api/hub.client.md - External Auth Client: api/hub.external_client.md - - PolarisFileSystem: api/hub.polarisfs.md - Additional: - Dataset Factory: api/factory.md - Data Converters: api/converters.md diff --git a/polaris/hub/client.py b/polaris/hub/client.py index 32b2e036..55a5b144 100644 --- a/polaris/hub/client.py +++ b/polaris/hub/client.py @@ -1,11 +1,9 @@ import json -import ssl from hashlib import md5 from io import BytesIO from typing import get_args from urllib.parse import urljoin -import certifi import httpx import pandas as pd import zarr @@ -13,7 +11,7 @@ from authlib.integrations.httpx_client import OAuth2Client, OAuthError from authlib.oauth2 import OAuth2Error, TokenAuth from authlib.oauth2.rfc6749 import OAuth2Token -from httpx import HTTPStatusError, Response +from httpx import ConnectError, HTTPStatusError, Response from loguru import logger from typing_extensions import Self @@ -36,6 +34,7 @@ PolarisCreateArtifactError, PolarisHubError, PolarisRetrieveArtifactError, + PolarisSSLError, PolarisUnauthorizedError, ) from polaris.utils.types import ( @@ -189,42 +188,31 @@ def fetch_token(self, **kwargs): def _base_request_to_hub(self, url: str, method: str, **kwargs) -> Response: """Utility function since most API methods follow the same pattern""" - response = self.request(url=url, method=method, **kwargs) try: + response = self.request(url=url, method=method, **kwargs) response.raise_for_status() return response except HTTPStatusError as error: - response_status_code = response.status_code - - # With an internal server error, we are not sure the custom error-handling code on the hub is reached. - if response_status_code == 500: - raise - - # If JSON is included in the response body, we retrieve it and format it for output. If not, we fallback to - # retrieving plain text from the body. This is important for handling certain errors thrown from the backend - # which do not contain JSON in the response. + # If JSON is included in the response body, we retrieve it and format it for output. If not, we fall back to + # retrieving plain text from the body. 500 errors will not have a JSON response. try: - response = response.json() - response = json.dumps(response, indent=2, sort_keys=True) + response_text = error.response.json() + response_text = json.dumps(response_text, indent=2, sort_keys=True) except (json.JSONDecodeError, TypeError): - response = response.text - - # Providing the user a more helpful error message suggesting a re-login when their access - # credentials expire. - if response_status_code == 401: - raise PolarisUnauthorizedError(response=response) from error - - # The below two error cases can happen due to the JWT token containing outdated information. - # We therefore throw a custom error with a recommended next step. - if response_status_code == 403: - # This happens when trying to create an artifact for an owner the user has no access to. - raise PolarisCreateArtifactError(response=response) from error - - if response_status_code == 404: - # This happens when an artifact doesn't exist _or_ when the user has no access to that artifact. - raise PolarisRetrieveArtifactError(response=response) from error - - raise PolarisHubError(response=response) from error + response_text = error.response.text + + match error.response.status_code: + case 401: + # Providing the user a more helpful error message suggesting a re-login when their access credentials expire. + raise PolarisUnauthorizedError(response_text=response_text) from error + case 403: + # The error can happen due to the JWT token containing outdated information. + raise PolarisCreateArtifactError(response_text=response_text) from error + case 404: + # This happens when an artifact doesn't exist _or_ when the user has no access to that artifact. + raise PolarisRetrieveArtifactError(response_text=response_text) from error + case _: + raise PolarisHubError(response_text=response_text) from error def get_metadata_from_response(self, response: Response, key: str) -> str | None: """Get custom metadata saved to the R2 object from the headers.""" @@ -235,31 +223,14 @@ def request(self, method, url, withhold_token=False, auth=httpx.USE_CLIENT_DEFAU """Wraps the base request method to handle errors""" try: return super().request(method, url, withhold_token, auth, **kwargs) - except httpx.ConnectError as error: + except ConnectError as error: # NOTE (cwognum): In the stack trace, the more specific SSLCertVerificationError is raised. # We could use the traceback module to catch this specific error, but that would be overkill here. - if _HTTPX_SSL_ERROR_CODE in str(error): - raise ssl.SSLCertVerificationError( - "We could not verify the SSL certificate. " - f"Please ensure the installed version ({certifi.__version__}) of the `certifi` package is the latest. " - "If you require the usage of a custom CA bundle, you can set the POLARIS_CA_BUNDLE " - "environment variable to the path of your CA bundle. For debugging, you can temporarily disable " - "SSL verification by setting the POLARIS_CA_BUNDLE environment variable to `false`." - ) from error - raise error - except (MissingTokenError, InvalidTokenError, httpx.HTTPStatusError, OAuthError) as error: - if isinstance(error, httpx.HTTPStatusError) and error.response.status_code != 401: - raise - - # The `MissingTokenError`, `InvalidTokenError` and `OAuthError` errors from the AuthlibBaseError - # class do not hold the `response` attribute. To prevent a misleading `AttributeError` from - # being thrown, we conditionally set the error response below based on the error type. - if isinstance(error, httpx.HTTPStatusError): - error_response = error.response - else: - error_response = None - - raise PolarisUnauthorizedError(response=error_response) from error + error_string = str(error) + ExceptionClass = PolarisSSLError if _HTTPX_SSL_ERROR_CODE in error_string else PolarisHubError + raise ExceptionClass(error_string) from error + except (MissingTokenError, InvalidTokenError, OAuthError) as error: + raise PolarisUnauthorizedError() from error def login(self, overwrite: bool = False, auto_open_browser: bool = True): """Login to the Polaris Hub using the OAuth2 protocol. diff --git a/polaris/hub/polarisfs.py b/polaris/hub/polarisfs.py deleted file mode 100644 index 9bf2cb5a..00000000 --- a/polaris/hub/polarisfs.py +++ /dev/null @@ -1,250 +0,0 @@ -from hashlib import md5 -from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union - -import fsspec -from loguru import logger - -from polaris.utils.errors import PolarisChecksumError, PolarisHubError -from polaris.utils.misc import slugify -from polaris.utils.types import TimeoutTypes - -if TYPE_CHECKING: - from polaris.hub.client import PolarisHubClient - - -class PolarisFileSystem(fsspec.AbstractFileSystem): - """ - A file system interface for accessing datasets on the Polaris platform. - - This class extends `fsspec.AbstractFileSystem` and provides methods to list objects within a Polaris dataset - and fetch the content of a file from the dataset. - - Note: Zarr Integration - This file system can be used with Zarr to load multidimensional array data stored in a Dataset from - the Polaris infrastructure. This class is needed because we otherwise cannot generate signed URLs for - folders and Zarr is a folder based data-format. - - ```python - fs = PolarisFileSystem(...) - store = zarr.storage.FSStore(..., fs=polaris_fs) - root = zarr.open(store, mode="r") - ``` - - Args: - polaris_client: The Polaris Hub client used to make API requests. - dataset_owner: The owner of the dataset. - dataset_name: The name of the dataset. - """ - - sep = "/" - protocol = "polarisfs" - async_impl = False - - def __init__( - self, - polaris_client: "PolarisHubClient", - dataset_owner: str, - dataset_name: str, - **kwargs: dict, - ): - super().__init__(use_listing_cache=True, listings_expiry_time=None, max_paths=None, **kwargs) - - self.polaris_client = polaris_client - self.default_timeout = self.polaris_client.settings.default_timeout - - # Prefix to remove from ls entries - self.prefix = f"dataset/{dataset_owner}/{slugify(dataset_name)}/" - # Base path for uploading. Please pay attention on path version update. - self.base_path = f"/v1/storage/{self.prefix.rstrip('/')}" - - @staticmethod - def is_polarisfs_path(path: str) -> bool: - """Check if the given path is a PolarisFS path. - - Args: - path: The path to check. - - Returns: - True if the path is a PolarisFS path; otherwise, False. - """ - return path.startswith(f"{PolarisFileSystem.protocol}://") - - def ls( - self, - path: str, - detail: bool = False, - timeout: Optional[TimeoutTypes] = None, - **kwargs: dict, - ) -> Union[List[str], List[Dict[str, Any]]]: - """List objects in the specified path within the Polaris dataset. - - Args: - path: The path within the dataset to list objects. - detail: If True, returns detailed information about each object. - timeout: Maximum time (in seconds) to wait for the request to complete. - - Returns: - A list of dictionaries if detail is True; otherwise, a list of object names. - """ - if timeout is None: - timeout = self.default_timeout - - ls_path = self.sep.join([self.base_path, "ls", path]) - - # GET request to Polaris Hub to list objects in path - response = self.polaris_client.get(ls_path.rstrip("/"), timeout=timeout) - response.raise_for_status() - - detailed_listings = [ - { - "name": p["name"].removeprefix(self.prefix), - "size": p["size"], - "type": p["type"], - } - for p in response.json() - ] - - self.dircache[path] = detailed_listings - - if not detail: - return [p["name"] for p in detailed_listings] - - return detailed_listings - - def cat_file( - self, - path: str, - start: Union[int, None] = None, - end: Union[int, None] = None, - timeout: Optional[TimeoutTypes] = None, - **kwargs: dict, - ) -> bytes: - """Fetches and returns the content of a file from the Polaris dataset. - - Args: - path: The path to the file within the dataset. - start: The starting index of the content to retrieve. - end: The ending index of the content to retrieve. - timeout: Maximum time (in seconds) to wait for the request to complete. - kwargs: Extra arguments passed to `fsspec.open()` - - Returns: - The content of the requested file. - """ - if timeout is None: - timeout = self.default_timeout - - cat_path = self.sep.join([self.base_path, path]) - - # GET request to Polaris Hub for signed URL of file - response = self.polaris_client.get(cat_path) - - # This should be a 307 redirect with the signed URL - if response.status_code != 307: - raise PolarisHubError(message="Could not get signed URL from Polaris Hub.", response=response) - - hub_response_body = response.json() - signed_url = hub_response_body["url"] - - headers = {"Content-Type": "application/octet-stream", **hub_response_body["headers"]} - - response = self.polaris_client.request( - url=signed_url, - method="GET", - auth=None, - headers=headers, - timeout=timeout, - ) - response.raise_for_status() - response_content = response.content - - # Verify the checksum on download - expected_md5sum = self.polaris_client.get_metadata_from_response(response, "md5sum") - if expected_md5sum is None: - raise PolarisChecksumError("MD5 checksum not found in response headers.") - logger.debug(f"MD5 checksum found in response headers: {expected_md5sum}.") - - md5sum = md5(response_content).hexdigest() - logger.debug(f"MD5 checksum computed for response content: {md5sum}.") - - if md5sum != expected_md5sum: - raise PolarisChecksumError( - f"MD5 checksum verification failed. Expected {expected_md5sum}, got {md5sum}." - ) - - return response_content[start:end] - - def rm(self, path: str, recursive: bool = False, maxdepth: Optional[int] = None) -> None: - """Remove a file or directory from the Polaris dataset. - - This method is provided for compatibility with the Zarr storage interface. - It may be called by the Zarr store when removing a file or directory. - - Args: - path: The path to the file or directory to be removed. - recursive: If True, remove directories and their contents recursively. - maxdepth: The maximum depth to recurse when removing directories. - - Returns: - None - - Note: - This method currently it does not perform any removal operations and is included - as a placeholder that aligns with the Zarr interface's expectations. - """ - raise NotImplementedError("PolarisFS does not currently support the file removal operation.") - - def pipe_file( - self, - path: str, - content: Union[bytes, str], - timeout: Optional[TimeoutTypes] = None, - **kwargs: dict, - ) -> None: - """Pipes the content of a file to the Polaris dataset. - - Args: - path: The path to the file within the dataset. - content: The content to be piped into the file. - timeout: Maximum time (in seconds) to wait for the request to complete. - - Returns: - None - """ - if timeout is None: - timeout = self.default_timeout - - pipe_path = self.sep.join([self.base_path, path]) - - # PUT request to Polaris Hub to put object in path - response = self.polaris_client.put(pipe_path, timeout=timeout) - - if response.status_code != 307: - raise PolarisHubError(message="Could not get signed URL from Polaris Hub.", response=response) - - hub_response_body = response.json() - signed_url = hub_response_body["url"] - - headers = {"Content-Type": "application/octet-stream", **hub_response_body["headers"]} - - response = self.polaris_client.request( - url=signed_url, - method="PUT", - auth=None, - content=content, - headers=headers, - timeout=timeout, - ) - response.raise_for_status() - - new_listing = [{"name": path, "type": "file", "size": len(content)}] - try: - current_listings = self._ls_from_cache(self._parent(path)) - new_listing.extend(current_listings) - except FileNotFoundError: - # self._ls_from_cache() raises FileNotFoundError when no listings - # are found within a path. We can then assume it's a new directory - # and new_listing can be used as is. - pass - - self.dircache[self._parent(path)] = new_listing diff --git a/polaris/utils/errors.py b/polaris/utils/errors.py index 4a71eaa6..f647f63d 100644 --- a/polaris/utils/errors.py +++ b/polaris/utils/errors.py @@ -1,4 +1,4 @@ -from httpx import Response +import certifi class InvalidDatasetError(ValueError): @@ -48,43 +48,53 @@ class PolarisHubError(Exception): YELLOW = "\033[93m" _END_CODE = "\033[0m" - def __init__(self, message: str = "", response: Response | None = None): - prefix = "The request to the Polaris Hub failed." - - if response is not None: - prefix += f" The Hub responded with:\n{response}" - - super().__init__("\n".join([prefix, message])) - - def format(self, text: str, codes: str | list[str]): - if not isinstance(codes, list): - codes = [codes] + def __init__(self, message: str = "", response_text: str = ""): + parts = filter( + bool, + [ + f"{self.BOLD}The request to the Polaris Hub has failed.{self._END_CODE}", + f"{self.YELLOW}{message}{self._END_CODE}" if message else "", + f"----------------------\nError reported was:\n{response_text}" if response_text else "", + ], + ) - return "".join(codes) + text + self._END_CODE + super().__init__("\n".join(parts)) class PolarisUnauthorizedError(PolarisHubError): - def __init__(self, response: Response | None = None): + def __init__(self, response_text: str = ""): message = ( "You are not logged in to Polaris or your login has expired. " "You can use the Polaris CLI to easily authenticate yourself again with `polaris login --overwrite`." ) - super().__init__(message, response) + super().__init__(message, response_text) class PolarisCreateArtifactError(PolarisHubError): - def __init__(self, response: Response | None = None): + def __init__(self, response_text: str = ""): message = ( "Note: If you can confirm that you are authorized to perform this action, " "please call 'polaris login --overwrite' and try again. If the issue persists, please reach out to the Polaris team for support." ) - super().__init__(message, response) + super().__init__(message, response_text) class PolarisRetrieveArtifactError(PolarisHubError): - def __init__(self, response: Response | None = None): + def __init__(self, response_text: str = ""): message = ( "Note: If this artifact exists and you can confirm that you are authorized to retrieve it, " "please call 'polaris login --overwrite' and try again. If the issue persists, please reach out to the Polaris team for support." ) - super().__init__(message, response) + super().__init__(message, response_text) + + +class PolarisSSLError(PolarisHubError): + def __init__(self, response_text: str = ""): + message = ( + "We could not verify the SSL certificate. " + f"Please ensure the installed version ({certifi.__version__}) of the `certifi` package is the latest. " + "If you require the usage of a custom CA bundle, you can set the POLARIS_CA_BUNDLE " + "environment variable to the path of your CA bundle. For debugging, you can temporarily disable " + "SSL verification by setting the POLARIS_CA_BUNDLE environment variable to `false`." + ) + super().__init__(message, response_text) diff --git a/pyproject.toml b/pyproject.toml index 8130efda..ec11e58f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,10 +125,10 @@ omit = [ # We cannot yet test the interaction with the Hub. # See e.g. https://github.com/polaris-hub/polaris/issues/30 "polaris/hub/client.py", - "polaris/hub/external_auth_client.py", - "polaris/hub/oauth2.py", + "polaris/hub/external_client.py", "polaris/hub/settings.py", - "polaris/hub/polarisfs.py", + "polaris/hub/oauth.py", + "polaris/hub/storage.py", "polaris/hub/__init__.py", ] diff --git a/uv.lock b/uv.lock index f375b89f..53bce2a7 100644 --- a/uv.lock +++ b/uv.lock @@ -2237,7 +2237,6 @@ wheels = [ [[package]] name = "polaris-lib" -version = "0.11.4.dev0+g1649472.d20250123" source = { editable = "." } dependencies = [ { name = "authlib" },