Skip to content

Commit

Permalink
Python Wrapper: Implement Object Create No Pre-Sign (#6985)
Browse files Browse the repository at this point in the history
* Python Wrapper: Implement Object Create No Pre-Sign

* CR Fixes

* CR Fixes 2

* CR Fixes 3

* CR Fixes 4

* CR Fixes

* CR Fixes2
  • Loading branch information
N-o-Z committed Nov 15, 2023
1 parent 54c263b commit 48470a8
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 110 deletions.
83 changes: 46 additions & 37 deletions clients/python-wrapper/lakefs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,50 @@
"""

from typing import Optional
import requests
from requests.auth import HTTPBasicAuth
from __future__ import annotations
from typing import Optional, NamedTuple

import lakefs_sdk
from lakefs_sdk.client import LakeFSClient

from lakefs.config import ClientConfig
from lakefs.exceptions import NoAuthenticationFound
from lakefs.exceptions import NoAuthenticationFound, NotAuthorizedException, ServerException

# global default client
DefaultClient: Optional[Client] = None


class ServerStorageConfiguration(NamedTuple):
blockstore_type: str
pre_sign_support: bool
import_support: bool
blockstore_namespace_example: str
blockstore_namespace_validity_regex: str
pre_sign_support_ui: bool
import_validity_regex: str
default_namespace_prefix: Optional[str] = None


class ServerConfiguration:
_conf: lakefs_sdk.Config
_storage_conf: ServerStorageConfiguration

def __init__(self, client: Optional[Client] = DefaultClient):
try:
self._conf = client.sdk_client.config_api.get_config()
self._storage_conf = ServerStorageConfiguration(**self._conf.storage_config.__dict__)
except lakefs_sdk.exceptions.ApiException as e:
if isinstance(e, lakefs_sdk.exceptions.ApiException):
raise NotAuthorizedException(e.status, e.reason) from e
raise ServerException(e.status, e.reason) from e

@property
def version(self) -> str:
return self._conf.version_config.version

@property
def storage_config(self) -> ServerStorageConfiguration:
return self._storage_conf


class Client:
Expand All @@ -26,36 +61,13 @@ class Client:
"""

_client: Optional[LakeFSClient] = None
http_client: Optional[requests.Session] = None
_conf: Optional[ClientConfig] = None
_server_conf: Optional[lakefs_sdk.Config] = None
_server_conf: Optional[ServerConfiguration] = None

def __init__(self, **kwargs):
self._conf = ClientConfig(**kwargs)
self._client = LakeFSClient(self._conf.configuration)

# Set up http client
config = self._conf.configuration
headers = {}
auth = None
if config.access_token is not None:
# TODO: Create custom auth class and inherit from BaseAuth
headers["Authorization"] = f"Bearer {config.access_token}"

if config.username is not None and config.password is not None:
auth = HTTPBasicAuth(config.username, config.password)

self.http_client = requests.Session()
self.http_client.headers = headers
self.http_client.auth = auth

def close(self):
"""
Closes any open connections
"""
if self.http_client is not None:
self.http_client.close()

@property
def config(self):
"""
Expand All @@ -76,21 +88,18 @@ def storage_config(self):
lakeFS SDK storage config object, lazy evaluated.
"""
if self._server_conf is None:
self._server_conf = self._client.config_api.get_config()
return lakefs_sdk.StorageConfig(**self._server_conf.storage_config.__dict__)
self._server_conf = ServerConfiguration(self)
return self._server_conf.storage_config

@property
def version_config(self) -> lakefs_sdk.VersionConfig:
def version(self) -> str:
"""
lakeFS SDK version config object, lazy evaluated.
lakeFS Server version, lazy evaluated.
"""
if self._server_conf is None:
self._server_conf = self._client.config_api.get_config()
return lakefs_sdk.VersionConfig(**self._server_conf.version_config.__dict__)

self._server_conf = ServerConfiguration(self)
return self._server_conf.version

# global default client
DefaultClient: Optional[Client] = None

try:
DefaultClient = Client()
Expand Down
74 changes: 56 additions & 18 deletions clients/python-wrapper/lakefs/object_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from lakefs.client import Client, DefaultClient
from lakefs.exceptions import (
ObjectExistsException,
UnsupportedOperationException,
ObjectNotFoundException,
PermissionException,
ServerException
)

_RANGE_STR_TMPL = "bytes={start}-{end}"
_LAKEFS_METADATA_PREFIX = "x-lakefs-meta-"

# Type to support both strings and bytes in addition to streams (reference: httpx._types.RequestContent)
UploadContentType = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
Expand Down Expand Up @@ -182,15 +182,16 @@ def create(self,
if 'x' in mode and self.exists(): # Requires explicit create
raise ObjectExistsException

# TODO: handle streams
binary_mode = 'b' in mode
if binary_mode and isinstance(data, str):
content = data.encode('utf-8')
elif not binary_mode and isinstance(data, bytes):
content = data.decode('utf-8')
# TODO: handle streams
is_presign = pre_sign if pre_sign is not None else self.pre_sign
try:
stats = self._upload(content, is_presign, content_type, metadata)
stats = self._upload_presign(content, content_type, metadata) if is_presign \
else self._upload_raw(content, content_type, metadata)
self._stat = ObjectStats(**stats.__dict__)
except lakefs_sdk.exceptions.ApiException as e:
_handle_api_exception(e)
Expand Down Expand Up @@ -223,29 +224,66 @@ def _extract_etag_from_response(headers) -> str:
etag = headers.get("ETag", "").strip(' "')
return etag

def _upload(self, content, pre_sign, content_type: Optional[str] = None,
metadata: Optional[dict[str, str]] = None) -> ObjectStats:
if not pre_sign:
raise UnsupportedOperationException("Upload currently supported only in pre-sign mode")

headers = {}
if content_type is not None:
headers["Content-Type"] = content_type
def _upload_raw(self, content: UploadContentType, content_type: Optional[str] = None,
metadata: Optional[dict[str, str]] = None) -> ObjectStats:
"""
Use raw upload API call to bypass validation of content parameter
"""
headers = {
"Accept": "application/json",
"Content-Type": content_type if content_type is not None else "application/octet-stream"
}

# Create user metadata headers
for k, v in metadata.items():
headers[_LAKEFS_METADATA_PREFIX + k] = v

_response_types_map = {
'201': "ObjectStats",
'400': "Error",
'401': "Error",
'403': "Error",
'404': "Error",
'412': "Error",
'420': None,
}
# authentication setting
_auth_settings = ['basic_auth', 'cookie_auth', 'oidc_auth', 'saml_auth', 'jwt_token']
return self._client.sdk_client.objects_api.api_client.call_api(
resource_path='/repositories/{repository}/branches/{branch}/objects',
method='POST',
path_params={"repository": self._repo, "branch": self._ref},
query_params={"path": self._path},
header_params=headers,
body=content,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
_return_http_data_only=True
)

def _upload_presign(self, content: UploadContentType, content_type: Optional[str] = None,
metadata: Optional[dict[str, str]] = None) -> ObjectStats:
headers = {
"Accept": "application/json",
"Content-Type": content_type if content_type is not None else "application/octet-stream"
}

staging_location = self._client.sdk_client.staging_api.get_physical_address(self._repo,
self._ref,
self._path,
pre_sign)
True)
url = staging_location.presigned_url
if self._client.storage_config.blockstore_type == "azure":
headers["x-ms-blob-type"] = "BlockBlob"

resp = self._client.http_client.put(url,
data=content,
headers=headers,
auth=None) # Explicitly remove default client authentication
resp.raise_for_status()
etag = WriteableObject._extract_etag_from_response(resp.headers)
resp = self._client.sdk_client.objects_api.api_client.request(
method="PUT",
url=url,
body=content,
headers=headers
)

etag = WriteableObject._extract_etag_from_response(resp.getheaders())
staging_metadata = StagingMetadata(staging=staging_location,
size_bytes=len(content),
checksum=etag,
Expand Down
46 changes: 0 additions & 46 deletions clients/python-wrapper/lakefs/server_config.py

This file was deleted.

4 changes: 2 additions & 2 deletions clients/python-wrapper/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
setuptools==68.2.2
lakefs-sdk~=1.1.0
requests~=2.31.0
pyyaml~=6.0.1
urllib3~=2.0.7
pyyaml~=6.0.1
4 changes: 2 additions & 2 deletions clients/python-wrapper/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
python_requires=">=3.9",
install_requires=REQUIRES,
tests_require={
"dev": ["pytest ~= 7.4.3", "pytest-mock ~= 3.12.0"]},
packages=find_packages(exclude=["tests", "system-tests"]),
"dev": ["pytest ~= 7.4.3"]},
packages=find_packages(exclude=["tests"]),
include_package_data=True,
license="Apache 2.0",
long_description=long_description,
Expand Down
9 changes: 4 additions & 5 deletions clients/python-wrapper/tests/utests/test_object_io.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import http
from contextlib import contextmanager
import requests
import urllib3

import lakefs_sdk.api
from lakefs_sdk.rest import RESTResponse

from tests.utests.common import get_test_client

Expand Down Expand Up @@ -156,9 +156,8 @@ def test_create(self, monkeypatch, tmp_path):
with writeable_object_context(monkeypatch, **test_kwargs.__dict__) as obj:
staging_location = StagingTestLocation()
monkeypatch.setattr(lakefs_sdk.api.StagingApi, "get_physical_address", lambda *args: staging_location)
resp = requests.Response()
resp.status_code = http.HTTPStatus.OK.value
monkeypatch.setattr(requests.Session, "put", lambda *args, **kwargs: resp)
monkeypatch.setattr(obj._client.sdk_client.objects_api.api_client, "request",
lambda *args, **kwargs: RESTResponse(urllib3.response.HTTPResponse()))

def monkey_link_physical_address(*_, staging_metadata: lakefs_sdk.StagingMetadata, **__):
assert staging_metadata.size_bytes == len(data)
Expand Down

0 comments on commit 48470a8

Please sign in to comment.