From bd9800bd48221d1a81da6af516aaae1bf59e4696 Mon Sep 17 00:00:00 2001 From: Julien St-Laurent Date: Mon, 13 Nov 2023 15:51:37 -0500 Subject: [PATCH 1/2] fix: Update the file size and checksum set in the tableContent payload attribute to use those of the Parquet file to be uploaded --- polaris/_artifact.py | 7 ++++++- polaris/hub/client.py | 44 +++++++++++++++++++++++-------------------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/polaris/_artifact.py b/polaris/_artifact.py index 364591c9..cdc3f9fe 100644 --- a/polaris/_artifact.py +++ b/polaris/_artifact.py @@ -2,7 +2,7 @@ from typing import Dict, Optional, Union import fsspec -from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_serializer, field_validator +from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, computed_field, field_serializer, field_validator from pydantic.alias_generators import to_camel from polaris.utils.types import HubOwner, SlugCompatibleStringType @@ -38,6 +38,11 @@ class BaseArtifactModel(BaseModel): owner: Optional[HubOwner] = None _verified: bool = PrivateAttr(False) + @computed_field + @property + def artifact_id(self) -> Optional[str]: + return f"{self.owner.slug}/{self.name}" if self.owner and self.name else None + @field_serializer("owner") def _serialize_owner(self, value: HubOwner) -> Union[str, None]: return self.owner.slug if self.owner else None diff --git a/polaris/hub/client.py b/polaris/hub/client.py index 4751780b..d911ea5a 100644 --- a/polaris/hub/client.py +++ b/polaris/hub/client.py @@ -3,6 +3,7 @@ import ssl import sys import webbrowser +from hashlib import md5, sha256 from io import BytesIO from typing import Callable, Optional, Union from urllib.parse import urljoin @@ -143,7 +144,7 @@ def _base_request_to_hub(self, url: str, method: str, **kwargs): f"The request to the Polaris Hub failed. See the error message below for more details:\n{response}" ) from error - # Convert the reponse to json format if the reponse contains a 'text' body + # Convert the response to json format if the response contains a 'text' body try: response = response.json() except json.JSONDecodeError: @@ -300,7 +301,7 @@ def list_datasets(self, limit: int = 100, offset: int = 0) -> list[str]: response = self._base_request_to_hub( url="/dataset", method="GET", params={"limit": limit, "offset": offset} ) - dataset_list = [f"{HubOwner(**bm['owner'])}/{bm['name']}" for bm in response["data"]] + dataset_list = [bm['artifactId'] for bm in response["data"]] return dataset_list def get_dataset(self, owner: Union[str, HubOwner], name: str) -> Dataset: @@ -443,30 +444,33 @@ def upload_dataset( # 2. Upload the parquet file to the hub # TODO: Revert step 1 in case step 2 fails - Is this needed? Or should this be taken care of by the hub? - # Step 1: Upload meta-data - # Instead of directly uploading the table, we announce to the hub that we intend to upload one. - dataset_json["tableContent"] = { - "size": sys.getsizeof(dataset.table), - "fileType": "parquet", - "md5sum": dataset._compute_checksum(dataset.table), - "url": urljoin( - self.settings.hub_url, f"/storage/dataset/{dataset.owner}/{dataset.name}/table.parquet" - ), - } - dataset_json["access"] = access - url = f"/dataset/{dataset.owner}/{dataset.name}" - response = self._base_request_to_hub(url=url, method="PUT", json=dataset_json) - - # Step 2: Upload the parquet file # Write the parquet file directly to a buffer buffer = BytesIO() dataset.table.to_parquet(buffer, engine="auto") + parquet_size = len(buffer.getbuffer()) + parquet_md5 = md5(buffer.getbuffer()).hexdigest() + # Step 1: Upload meta-data + # Instead of directly uploading the table, we announce to the hub that we intend to upload one. + url = f"/dataset/{dataset.artifact_id}" + response = self._base_request_to_hub(url=url, method="PUT", json={ + "tableContent": { + "size": parquet_size, + "fileType": "parquet", + "md5sum": parquet_md5, + }, + "access": access, + **dataset_json, + }) + + # Step 2: Upload the parquet file # create an empty PUT request to get the table content URL from cloudflare hub_response = self.request( - url=dataset_json["tableContent"]["url"], + url=response["tableContent"]["url"], method="PUT", - headers={"Content-type": "application/vnd.apache.parquet"}, + headers={ + "Content-type": "application/vnd.apache.parquet", + }, ) if hub_response.status_code == 307: @@ -517,7 +521,7 @@ def upload_benchmark(self, benchmark: BenchmarkSpecification, access: AccessType # Get the serialized data-model # We exclude the dataset as we expect it to exist on the hub already. - benchmark_json = benchmark.model_dump(exclude=["dataset"], exclude_none=True, by_alias=True) + benchmark_json = benchmark.model_dump(exclude={"dataset"}, exclude_none=True, by_alias=True) benchmark_json["datasetArtifactId"] = f"{benchmark.dataset.owner}/{benchmark.dataset.name}" benchmark_json["access"] = access From 3b10870f17e0e253d611e0494391108f87a8a290 Mon Sep 17 00:00:00 2001 From: Julien St-Laurent Date: Mon, 13 Nov 2023 15:58:43 -0500 Subject: [PATCH 2/2] chore: Lint and checks --- polaris/_artifact.py | 10 +++++++++- polaris/hub/client.py | 25 ++++++++++++++----------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/polaris/_artifact.py b/polaris/_artifact.py index cdc3f9fe..34fc2e4e 100644 --- a/polaris/_artifact.py +++ b/polaris/_artifact.py @@ -2,7 +2,15 @@ from typing import Dict, Optional, Union import fsspec -from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, computed_field, field_serializer, field_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + PrivateAttr, + computed_field, + field_serializer, + field_validator, +) from pydantic.alias_generators import to_camel from polaris.utils.types import HubOwner, SlugCompatibleStringType diff --git a/polaris/hub/client.py b/polaris/hub/client.py index d911ea5a..ef7785c4 100644 --- a/polaris/hub/client.py +++ b/polaris/hub/client.py @@ -1,9 +1,8 @@ import json import os import ssl -import sys import webbrowser -from hashlib import md5, sha256 +from hashlib import md5 from io import BytesIO from typing import Callable, Optional, Union from urllib.parse import urljoin @@ -301,7 +300,7 @@ def list_datasets(self, limit: int = 100, offset: int = 0) -> list[str]: response = self._base_request_to_hub( url="/dataset", method="GET", params={"limit": limit, "offset": offset} ) - dataset_list = [bm['artifactId'] for bm in response["data"]] + dataset_list = [bm["artifactId"] for bm in response["data"]] return dataset_list def get_dataset(self, owner: Union[str, HubOwner], name: str) -> Dataset: @@ -453,15 +452,19 @@ def upload_dataset( # Step 1: Upload meta-data # Instead of directly uploading the table, we announce to the hub that we intend to upload one. url = f"/dataset/{dataset.artifact_id}" - response = self._base_request_to_hub(url=url, method="PUT", json={ - "tableContent": { - "size": parquet_size, - "fileType": "parquet", - "md5sum": parquet_md5, + response = self._base_request_to_hub( + url=url, + method="PUT", + json={ + "tableContent": { + "size": parquet_size, + "fileType": "parquet", + "md5sum": parquet_md5, + }, + "access": access, + **dataset_json, }, - "access": access, - **dataset_json, - }) + ) # Step 2: Upload the parquet file # create an empty PUT request to get the table content URL from cloudflare