From 1e59d03232b156d310f20fa975aec1a89c5d7a61 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Wed, 26 Feb 2025 10:49:46 -0500 Subject: [PATCH 01/13] Use Opendal driver in Http driver --- .../jumpstarter_driver_http/client.py | 61 +------------- .../jumpstarter_driver_http/driver.py | 83 +------------------ .../jumpstarter_driver_http/driver_test.py | 69 ++++++--------- .../jumpstarter-driver-http/pyproject.toml | 1 + uv.lock | 2 + 5 files changed, 33 insertions(+), 183 deletions(-) diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py index e541f3a90..2f5ed37d3 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py @@ -1,14 +1,10 @@ from dataclasses import dataclass -from pathlib import Path -from jumpstarter_driver_opendal.adapter import OpendalAdapter -from opendal import Operator - -from jumpstarter.client import DriverClient +from jumpstarter_driver_composite.client import CompositeClient @dataclass(kw_only=True) -class HttpServerClient(DriverClient): +class HttpServerClient(CompositeClient): """Client for the HTTP server driver""" def start(self): @@ -30,59 +26,6 @@ def stop(self): """ self.call("stop") - def list_files(self) -> list[str]: - """ - List all files in the HTTP server's root directory. - - Returns: - list[str]: A list of filenames present in the HTTP server's root directory - """ - return self.call("list_files") - - def put_file(self, filename: str, src_stream): - """ - Upload a file to the HTTP server using a streamed source. - - Args: - filename (str): Name to save the file as on the server. - src_stream: Stream/source to read the file data from. - - Returns: - str: URL of the uploaded file - """ - return self.call("put_file", filename, src_stream) - - def put_local_file(self, filepath: str) -> str: - """ - Upload a file from the local filesystem to the HTTP server. - - Note: This doesn't use HTTP to upload; it streams the file content directly. - - Args: - filepath (str): Path to the local file to upload. - - Returns: - str: Name of the uploaded file - - Example: - >>> client.put_local_file("/path/to/local/file.txt") - """ - absolute = Path(filepath).resolve() - with OpendalAdapter(client=self, operator=Operator("fs", root="/"), path=str(absolute), mode="rb") as handle: - return self.call("put_file", absolute.name, handle) - - def delete_file(self, filename: str) -> str: - """ - Delete a file from the HTTP server. - - Args: - filename (str): Name of the file to delete. - - Returns: - str: Name of the deleted file - """ - return self.call("delete_file", filename) - def get_host(self) -> str: """ Get the host IP address the HTTP server is listening on. diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py index 7c8c4a6ff..7db8623db 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py @@ -1,11 +1,10 @@ import os from dataclasses import dataclass, field -from pathlib import Path from typing import Optional import anyio from aiohttp import web -from anyio.streams.file import FileWriteStream +from jumpstarter_driver_opendal.driver import Opendal from jumpstarter.driver import Driver, export @@ -34,6 +33,8 @@ def __post_init__(self): super().__post_init__() os.makedirs(self.root_dir, exist_ok=True) + + self.children["storage"] = Opendal(scheme="fs", kwargs={"root": self.root_dir}) self.app.router.add_routes( [ web.static("/", self.root_dir), @@ -58,84 +59,6 @@ def client(cls) -> str: """Return the import path of the corresponding client""" return "jumpstarter_driver_http.client.HttpServerClient" - @export - async def put_file(self, filename: str, src_stream) -> str: - """ - Upload a file to the HTTP server. - - Args: - filename (str): Name of the file to upload. - src_stream: Stream of file content. - - Returns: - str: Name of the uploaded file. - - Raises: - HttpServerError: If the target path is invalid. - FileWriteError: If the file upload fails. - """ - try: - file_path = os.path.join(self.root_dir, filename) - - if not Path(file_path).resolve().is_relative_to(Path(self.root_dir).resolve()): - raise HttpServerError("Invalid target path") - - async with await FileWriteStream.from_path(file_path) as dst: - async with self.resource(src_stream, timeout=self.timeout) as src: - async for chunk in src: - await dst.send(chunk) - - self.logger.info(f"File '{filename}' written to '{file_path}'") - return f"{self.get_url()}/{filename}" - - except Exception as e: - self.logger.error(f"Failed to upload file '{filename}': {e}") - raise FileWriteError(f"Failed to upload file '{filename}': {e}") from e - - @export - async def delete_file(self, filename: str) -> str: - """ - Delete a file from the HTTP server. - - Args: - filename (str): Name of the file to delete. - - Returns: - str: Name of the deleted file. - - Raises: - HttpServerError: If the file does not exist or deletion fails. - """ - file_path = Path(self.root_dir) / filename - if not file_path.exists(): - raise HttpServerError(f"File '{filename}' does not exist.") - try: - file_path.unlink() - self.logger.info(f"File '{filename}' has been deleted.") - return filename - except Exception as e: - self.logger.error(f"Failed to delete file '{filename}': {e}") - raise HttpServerError(f"Failed to delete file '{filename}': {e}") from e - - @export - def list_files(self) -> list[str]: - """ - List all files in the root directory. - - Returns: - list[str]: List of filenames in the root directory. - - Raises: - HttpServerError: If listing files fails. - """ - try: - files = os.listdir(self.root_dir) - files = [f for f in files if os.path.isfile(os.path.join(self.root_dir, f))] - return files - except Exception as e: - self.logger.error(f"Failed to list files: {e}") - raise HttpServerError(f"Failed to list files: {e}") from e - @export async def start(self): """ diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py index b0dc15aec..118746b8d 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py @@ -1,14 +1,8 @@ -import os -import uuid -from tempfile import TemporaryDirectory - import aiohttp -import anyio import pytest -from anyio import create_memory_object_stream from .driver import HttpServer -from jumpstarter.common.resources import ClientStreamResource +from jumpstarter.common.utils import serve @pytest.fixture @@ -17,44 +11,32 @@ def anyio_backend(): @pytest.fixture -def temp_dir(): - with TemporaryDirectory() as tmpdir: - yield tmpdir - - -@pytest.fixture -async def server(temp_dir): - server = HttpServer(root_dir=temp_dir) - await server.start() - try: - yield server - finally: - await server.stop() +def http(tmp_path): + with serve(HttpServer(root_dir=str(tmp_path))) as client: + client.start() + try: + yield client + finally: + client.stop() @pytest.mark.anyio -async def test_http_server(server): +async def test_http_server(http, tmp_path): + local_filename = "test_src.txt" filename = "test.txt" test_content = b"test content" - send_stream, receive_stream = create_memory_object_stream(max_buffer_size=1024) - - resource_uuid = uuid.uuid4() - server.resources[resource_uuid] = receive_stream - - resource_handle = ClientStreamResource(uuid=resource_uuid).model_dump(mode="json") + (tmp_path / local_filename).write_bytes(test_content) - async def send_data(): - await send_stream.send(test_content) - await send_stream.aclose() + file = http.storage.open(filename, "wb") + file.write(str(tmp_path / local_filename)) + file.close() - async with anyio.create_task_group() as tg: - tg.start_soon(send_data) + print(http.storage.stat(filename)) - uploaded_url = await server.put_file(filename, resource_handle) - assert uploaded_url == f"{server.get_url()}/{filename}" + uploaded_url = f"{http.get_url()}/{filename}" - files = server.list_files() + files = list(http.storage.list("/")) assert filename in files async with aiohttp.ClientSession() as session: @@ -63,20 +45,19 @@ async def send_data(): retrieved_content = await response.read() assert retrieved_content == test_content - deleted_filename = await server.delete_file(filename) - assert deleted_filename == filename + http.storage.delete(filename) - files_after_deletion = server.list_files() + files_after_deletion = list(http.storage.list("/")) assert filename not in files_after_deletion -def test_http_server_host_config(temp_dir): +def test_http_server_host_config(tmp_path): custom_host = "192.168.1.1" - server = HttpServer(root_dir=temp_dir, host=custom_host) + server = HttpServer(root_dir=str(tmp_path), host=custom_host) assert server.get_host() == custom_host -def test_http_server_root_directory_creation(temp_dir): - new_dir = os.path.join(temp_dir, "new_http_root") - _ = HttpServer(root_dir=new_dir) - assert os.path.exists(new_dir) +def test_http_server_root_directory_creation(tmp_path): + new_dir = tmp_path / "new_http_root" + _ = HttpServer(root_dir=str(new_dir)) + assert new_dir.exists() diff --git a/packages/jumpstarter-driver-http/pyproject.toml b/packages/jumpstarter-driver-http/pyproject.toml index 1a2afd819..d8158a754 100644 --- a/packages/jumpstarter-driver-http/pyproject.toml +++ b/packages/jumpstarter-driver-http/pyproject.toml @@ -10,6 +10,7 @@ requires-python = ">=3.12" dependencies = [ "anyio>=4.6.2.post1", "jumpstarter", + "jumpstarter-driver-composite", "jumpstarter-driver-opendal" ] diff --git a/uv.lock b/uv.lock index 62742f552..6d89f0b38 100644 --- a/uv.lock +++ b/uv.lock @@ -1151,6 +1151,7 @@ source = { editable = "packages/jumpstarter-driver-http" } dependencies = [ { name = "anyio" }, { name = "jumpstarter" }, + { name = "jumpstarter-driver-composite" }, { name = "jumpstarter-driver-opendal" }, ] @@ -1165,6 +1166,7 @@ dev = [ requires-dist = [ { name = "anyio", specifier = ">=4.6.2.post1" }, { name = "jumpstarter", editable = "packages/jumpstarter" }, + { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, { name = "jumpstarter-driver-opendal", editable = "packages/jumpstarter-driver-opendal" }, ] From bac14ea79ebe3c6675f613abdc5b2ff04d4b6886 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:07:48 -0500 Subject: [PATCH 02/13] Implement read/write_bytes helper on OpendalFile --- .../jumpstarter_driver_opendal/client.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py index 11e3902f4..ec5ae7687 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py @@ -2,10 +2,12 @@ from collections.abc import Generator from dataclasses import dataclass +from io import BytesIO from pathlib import Path from uuid import UUID import asyncclick as click +from anyio.abc import ObjectStream from opendal import Operator from pydantic import ConfigDict, validate_call @@ -14,6 +16,23 @@ from jumpstarter.client import DriverClient +@dataclass(kw_only=True) +class BytesIOStream(ObjectStream[bytes]): + buf: BytesIO + + async def send(self, item: bytes): + self.buf.write(item) + + async def receive(self) -> bytes: + return self.buf.read(size=65535) + + async def send_eof(self): + pass + + async def aclose(self): + pass + + @dataclass(kw_only=True) class OpendalFile: """ @@ -51,6 +70,21 @@ def read(self, path: PathBuf, operator: Operator | None = None): with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle: return self.__read(handle) + @validate_call(validate_return=True) + def write_bytes(self, data: bytes): + buf = BytesIO(data) + async with BytesIOStream(buf=buf) as stream: + async with self.resource_async(stream) as handle: + return self.__write(handle) + + @validate_call(validate_return=True) + def read_bytes(self) -> bytes: + buf = BytesIO() + async with BytesIOStream(buf=buf) as stream: + async with self.resource_async(stream) as handle: + return self.__read(handle) + return buf.getvalue() + @validate_call(validate_return=True) def seek(self, pos: int, whence: int = 0) -> int: """ From 2f04dbdc1d811d00e4d1f0ffc725272134b1b957 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:13:39 -0500 Subject: [PATCH 03/13] fixup! Implement read/write_bytes helper on OpendalFile --- .../jumpstarter_driver_opendal/client.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py index ec5ae7687..064585ea5 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py @@ -7,6 +7,7 @@ from uuid import UUID import asyncclick as click +from anyio import EndOfStream from anyio.abc import ObjectStream from opendal import Operator from pydantic import ConfigDict, validate_call @@ -24,7 +25,10 @@ async def send(self, item: bytes): self.buf.write(item) async def receive(self) -> bytes: - return self.buf.read(size=65535) + item = self.buf.read(65535) + if len(item) == 0: + raise EndOfStream + return item async def send_eof(self): pass @@ -73,15 +77,15 @@ def read(self, path: PathBuf, operator: Operator | None = None): @validate_call(validate_return=True) def write_bytes(self, data: bytes): buf = BytesIO(data) - async with BytesIOStream(buf=buf) as stream: - async with self.resource_async(stream) as handle: + with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: + with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: return self.__write(handle) @validate_call(validate_return=True) def read_bytes(self) -> bytes: buf = BytesIO() - async with BytesIOStream(buf=buf) as stream: - async with self.resource_async(stream) as handle: + with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: + with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: return self.__read(handle) return buf.getvalue() From 8d095472e19a84b8103d4436a093e5b5753eae8f Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:13:47 -0500 Subject: [PATCH 04/13] Use OpendalFile.write_bytes helper on http driver test --- .../jumpstarter_driver_http/driver_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py index 118746b8d..ec38bc274 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py @@ -22,14 +22,11 @@ def http(tmp_path): @pytest.mark.anyio async def test_http_server(http, tmp_path): - local_filename = "test_src.txt" filename = "test.txt" test_content = b"test content" - (tmp_path / local_filename).write_bytes(test_content) - file = http.storage.open(filename, "wb") - file.write(str(tmp_path / local_filename)) + file.write_bytes(test_content) file.close() print(http.storage.stat(filename)) From 61f7d11465ef66778e8bfb800adfe631b3a1308e Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:16:17 -0500 Subject: [PATCH 05/13] Use OpendalFile.write_bytes helper on tftp driver test --- .../jumpstarter_driver_tftp/driver_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py b/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py index e29b648bb..6ca6f9b6e 100644 --- a/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py +++ b/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py @@ -21,14 +21,11 @@ def tftp(tmp_path): @pytest.mark.anyio async def test_tftp_file_operations(tftp, tmp_path): - local_filename = "test_src.txt" filename = "test.txt" test_data = b"Hello" - (tmp_path / local_filename).write_bytes(test_data) - file = tftp.storage.open(filename, "wb") - file.write(str(tmp_path / local_filename)) + file.write_bytes(test_data) file.close() files = list(tftp.storage.list("/")) From 61f39a4053fe19122d100d976c39a76f953644ff Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:20:20 -0500 Subject: [PATCH 06/13] Add read/write_bytes helper to OpendalClient --- .../jumpstarter_driver_opendal/client.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py index 064585ea5..fe71081ab 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Generator +from contextlib import closing from dataclasses import dataclass from io import BytesIO from pathlib import Path @@ -151,6 +152,16 @@ def writable(self) -> bool: class OpendalClient(DriverClient): + @validate_call(validate_return=True) + def write_bytes(self, /, path: PathBuf, data: bytes) -> None: + with closing(self.open(path, "wb")) as f: + f.write_bytes(data) + + @validate_call(validate_return=True) + def read_bytes(self, /, path: PathBuf) -> bytes: + with closing(self.open(path, "rb")) as f: + return f.read_bytes() + @validate_call def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile: """ From b3c421c6ecbcf63acae3affcb0e151a592f3da6d Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:23:29 -0500 Subject: [PATCH 07/13] fixup! Use OpendalFile.write_bytes helper on tftp driver test --- .../jumpstarter_driver_tftp/driver_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py b/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py index 6ca6f9b6e..6067415b0 100644 --- a/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py +++ b/packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py @@ -24,9 +24,7 @@ async def test_tftp_file_operations(tftp, tmp_path): filename = "test.txt" test_data = b"Hello" - file = tftp.storage.open(filename, "wb") - file.write_bytes(test_data) - file.close() + tftp.storage.write_bytes(filename, test_data) files = list(tftp.storage.list("/")) assert filename in files From 9ca1ed1dcc21c57c61f12a7a72f06f5a8838860b Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:23:45 -0500 Subject: [PATCH 08/13] fixup! Use OpendalFile.write_bytes helper on http driver test --- .../jumpstarter_driver_http/driver_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py index ec38bc274..aa4400b83 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py @@ -25,9 +25,7 @@ async def test_http_server(http, tmp_path): filename = "test.txt" test_content = b"test content" - file = http.storage.open(filename, "wb") - file.write_bytes(test_content) - file.close() + http.storage.write_bytes(filename, test_content) print(http.storage.stat(filename)) From a3e04a02c12bcb8945a5e61907638d9604295b5b Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:23:59 -0500 Subject: [PATCH 09/13] Use OpendalFile.write_bytes helper on opendal doctest --- docs/source/api-reference/drivers/opendal.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/docs/source/api-reference/drivers/opendal.md b/docs/source/api-reference/drivers/opendal.md index 57b497b1f..0c76e66a0 100644 --- a/docs/source/api-reference/drivers/opendal.md +++ b/docs/source/api-reference/drivers/opendal.md @@ -13,11 +13,9 @@ The Opendal driver is a driver for interacting with storages attached to the exp ```{doctest} >>> from tempfile import NamedTemporaryFile >>> opendal.create_dir("test/directory/") ->>> remote_file = opendal.open("test/directory/file", "wb") ->>> with NamedTemporaryFile() as local_file: -... assert local_file.write(b"hello") == 5 -... remote_file.write(local_file.name) ->>> remote_file.close() +>>> opendal.write_bytes("test/directory/file", b"hello") +>>> assert opendal.hash("test/directory/file", "md5") == "5d41402abc4b2a76b9719d911017c592" +>>> opendal.remove_all("test/") ``` ```{testsetup} * From 1537f9da32aca985a4e6c4711241caf685184868 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:26:28 -0500 Subject: [PATCH 10/13] Run ruff on jumpstarter/common/grpc.py --- .../jumpstarter/jumpstarter/common/grpc.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/jumpstarter/jumpstarter/common/grpc.py b/packages/jumpstarter/jumpstarter/common/grpc.py index 382f1e936..8536be200 100644 --- a/packages/jumpstarter/jumpstarter/common/grpc.py +++ b/packages/jumpstarter/jumpstarter/common/grpc.py @@ -35,12 +35,18 @@ def ssl_channel_credentials(target: str, tls_config): def aio_secure_channel(target: str, credentials: grpc.ChannelCredentials): - return grpc.aio.secure_channel(target, credentials, options=( - ("grpc.lb_policy_name", "round_robin"), - ("grpc.keepalive_time_ms", 350000), - ("grpc.keepalive_timeout_ms", 5000), - ("grpc.http2.max_pings_without_data", 5), - ("grpc.keepalive_permit_without_calls", 1))) + return grpc.aio.secure_channel( + target, + credentials, + options=( + ("grpc.lb_policy_name", "round_robin"), + ("grpc.keepalive_time_ms", 350000), + ("grpc.keepalive_timeout_ms", 5000), + ("grpc.http2.max_pings_without_data", 5), + ("grpc.keepalive_permit_without_calls", 1), + ), + ) + def configure_grpc_env(): # disable informative logs by default, i.e.: From f8f2ffd0570f23436794e50629e0d65f4fb4f57e Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 12:38:25 -0500 Subject: [PATCH 11/13] Add write_from_path and read_into_path helper --- .../jumpstarter_driver_opendal/client.py | 24 +++++++++++++++---- .../jumpstarter_driver_opendal/driver_test.py | 9 ++++--- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py index fe71081ab..e247cdca9 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py @@ -53,24 +53,30 @@ def __write(self, handle): def __read(self, handle): return self.client.call("file_read", self.fd, handle) + def __fs_operator_for_path(self, path: PathBuf) -> Operator: + if Path(path).is_absolute(): + return Operator("fs", root="/") + else: + return Operator("fs", root=str(Path.cwd())) + @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) - def write(self, path: PathBuf, operator: Operator | None = None): + def write_from_path(self, path: PathBuf, operator: Operator | None = None): """ Write into remote file with content from local file """ if operator is None: - operator = Operator("fs", root="/") + operator = self.__fs_operator_for_path(path) with OpendalAdapter(client=self.client, operator=operator, path=path) as handle: return self.__write(handle) @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) - def read(self, path: PathBuf, operator: Operator | None = None): + def read_into_path(self, path: PathBuf, operator: Operator | None = None): """ Read content from remote file into local file """ if operator is None: - operator = Operator("fs", root="/") + operator = self.__fs_operator_for_path(path) with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle: return self.__read(handle) @@ -162,6 +168,16 @@ def read_bytes(self, /, path: PathBuf) -> bytes: with closing(self.open(path, "rb")) as f: return f.read_bytes() + @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) + def write_from_path(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> None: + with closing(self.open(dst, "wb")) as f: + f.write_from_path(src, operator) + + @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) + def read_into_path(self, src: PathBuf, dst: PathBuf, operator: Operator | None = None) -> None: + with closing(self.open(src, "rb")) as f: + f.read_into_path(dst, operator) + @validate_call def open(self, /, path: PathBuf, mode: Mode) -> OpendalFile: """ diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py index ea8d432c9..7d7fbb4cf 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py @@ -32,13 +32,12 @@ def test_drivers_opendal(tmp_path): assert not test_file.readable() assert not test_file.seekable() assert test_file.writable() - - (tmp_path / "src").write_text("hello") - test_file.write(tmp_path / "src") - test_file.close() assert test_file.closed + (tmp_path / "src").write_text("hello") + client.write_from_path("test_dir/test_file", tmp_path / "src") + test_file = client.open("test_dir/test_file", "rb") assert not test_file.closed assert test_file.readable() @@ -54,7 +53,7 @@ def test_drivers_opendal(tmp_path): == "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" ) - test_file.read(tmp_path / "dst") + test_file.read_into_path(tmp_path / "dst") assert (tmp_path / "dst").read_text() == "llo" assert client.stat("dst").content_length == 3 From 89883671cf468db3eb07b6ace8e9a0f5d23a7a68 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 13:01:01 -0500 Subject: [PATCH 12/13] Better testing for opendal driver --- .../jumpstarter_driver_opendal/client.py | 17 +-- .../jumpstarter_driver_opendal/common.py | 4 +- .../jumpstarter_driver_opendal/driver_test.py | 143 ++++++++++++------ 3 files changed, 109 insertions(+), 55 deletions(-) diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py index e247cdca9..9b3f23015 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/client.py @@ -53,11 +53,8 @@ def __write(self, handle): def __read(self, handle): return self.client.call("file_read", self.fd, handle) - def __fs_operator_for_path(self, path: PathBuf) -> Operator: - if Path(path).is_absolute(): - return Operator("fs", root="/") - else: - return Operator("fs", root=str(Path.cwd())) + def __fs_operator_for_path(self, path: PathBuf) -> (PathBuf, Operator): + return Path(path).resolve(), Operator("fs", root="/") @validate_call(validate_return=True, config=ConfigDict(arbitrary_types_allowed=True)) def write_from_path(self, path: PathBuf, operator: Operator | None = None): @@ -65,7 +62,7 @@ def write_from_path(self, path: PathBuf, operator: Operator | None = None): Write into remote file with content from local file """ if operator is None: - operator = self.__fs_operator_for_path(path) + path, operator = self.__fs_operator_for_path(path) with OpendalAdapter(client=self.client, operator=operator, path=path) as handle: return self.__write(handle) @@ -76,24 +73,24 @@ def read_into_path(self, path: PathBuf, operator: Operator | None = None): Read content from remote file into local file """ if operator is None: - operator = self.__fs_operator_for_path(path) + path, operator = self.__fs_operator_for_path(path) with OpendalAdapter(client=self.client, operator=operator, path=path, mode="wb") as handle: return self.__read(handle) @validate_call(validate_return=True) - def write_bytes(self, data: bytes): + def write_bytes(self, data: bytes) -> None: buf = BytesIO(data) with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: - return self.__write(handle) + self.__write(handle) @validate_call(validate_return=True) def read_bytes(self) -> bytes: buf = BytesIO() with self.client.portal.wrap_async_context_manager(BytesIOStream(buf=buf)) as stream: with self.client.portal.wrap_async_context_manager(self.client.resource_async(stream)) as handle: - return self.__read(handle) + self.__read(handle) return buf.getvalue() @validate_call(validate_return=True) diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py index d5f68af71..f847e4458 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/common.py @@ -24,10 +24,10 @@ def __validate(cls, data: Any): return data def is_file(self) -> bool: - return self._is_file + return self.entry_is_file def is_dir(self) -> bool: - return self._is_dir + return self.entry_is_dir class Metadata(BaseModel): diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py index 7d7fbb4cf..485ec8098 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py @@ -1,3 +1,5 @@ +import hashlib +import os from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from random import randbytes @@ -12,70 +14,125 @@ from jumpstarter.common.utils import serve -def test_drivers_opendal(tmp_path): +@pytest.fixture(scope="function") +def opendal(tmp_path): with serve(Opendal(scheme="fs", kwargs={"root": str(tmp_path)})) as client: - assert not client.capability().presign + yield client - client.create_dir("test_dir/") - client.create_dir("demo_dir/nest_dir/") - assert client.exists("test_dir/") - assert client.exists("demo_dir/nest_dir/") +test_file = "test_file.txt" +test_content = b"hello" - assert client.stat("test_dir/").mode.is_dir - assert sorted(client.list("/")) == ["/", "demo_dir/", "test_dir/"] - assert sorted(client.scan("/")) == ["/", "demo_dir/", "demo_dir/nest_dir/", "test_dir/"] +def test_driver_opendal_read_write_bytes(opendal): + opendal.write_bytes(test_file, test_content) - test_file = client.open("test_dir/test_file", "wb") - assert not test_file.closed - assert not test_file.readable() - assert not test_file.seekable() - assert test_file.writable() - test_file.close() - assert test_file.closed + assert opendal.read_bytes(test_file) == test_content + assert opendal.hash(test_file, "md5") == hashlib.md5(test_content).hexdigest() + assert opendal.hash(test_file, "sha256") == hashlib.sha256(test_content).hexdigest() - (tmp_path / "src").write_text("hello") - client.write_from_path("test_dir/test_file", tmp_path / "src") - test_file = client.open("test_dir/test_file", "rb") - assert not test_file.closed - assert test_file.readable() - assert test_file.seekable() - assert not test_file.writable() +def test_driver_opendal_read_write_path(opendal, tmp_path): + src = tmp_path / "src" + dst = tmp_path / "dst" - assert test_file.tell() == 0 - assert test_file.seek(2) == 2 + src.write_bytes(test_content) - assert client.hash("test_dir/test_file", "md5") == "5d41402abc4b2a76b9719d911017c592" - assert ( - client.hash("test_dir/test_file", "sha256") - == "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" - ) + opendal.write_from_path(test_file, src) + opendal.read_into_path(test_file, dst) + + assert dst.read_bytes() == test_content + + +def test_driver_opendal_seek_tell(opendal): + off = -3 + pos = len(test_content) + off + + assert pos >= 0 + + opendal.write_bytes(test_file, test_content) + + file = opendal.open(test_file, "rb") + file.seek(off, os.SEEK_END) + + assert file.tell() == pos + assert file.read_bytes() == test_content[off:] + + file.close() + + +def test_driver_opendal_file_property(opendal): + file = opendal.open(test_file, "wb") + + assert not file.closed + assert not file.readable() + assert not file.seekable() + assert file.writable() + + file.close() + + assert file.closed + + file = opendal.open(test_file, "rb") + + assert not file.closed + assert file.readable() + assert file.seekable() + assert not file.writable() - test_file.read_into_path(tmp_path / "dst") - assert (tmp_path / "dst").read_text() == "llo" + file.close() - assert client.stat("dst").content_length == 3 + assert file.closed - test_file.close() - assert test_file.closed - client.copy("test_dir/test_file", "test_dir/copy_file") - client.rename("test_dir/copy_file", "test_dir/rename_file") - assert not client.exists("test_dir/copy_file") - assert client.exists("test_dir/rename_file") +def test_driver_opendal_file_metadata(opendal): + opendal.write_bytes(test_file, test_content) - client.delete("test_dir/rename_file") - assert not client.exists("test_dir/rename_file") + assert opendal.exists(test_file) + assert opendal.stat(test_file).mode.is_file() - client.remove_all("test_dir/") - assert not client.exists("test_dir/") + opendal.copy(test_file, "copy_of_test_file") + assert opendal.exists("copy_of_test_file") + + opendal.rename("copy_of_test_file", "renamed_copy_of_test_file") + + assert not opendal.exists("copy_of_test_file") + assert opendal.exists("renamed_copy_of_test_file") + + opendal.delete("renamed_copy_of_test_file") + + assert not opendal.exists("renamed_copy_of_test_file") + + opendal.create_dir("test_dir/") + + assert opendal.exists("test_dir/") + + assert opendal.stat("test_dir/").mode.is_dir() + + opendal.remove_all("test_dir/") + + assert not opendal.exists("test_dir/") + + +def test_driver_opendal_file_list_scan(opendal): + opendal.create_dir("a/b/c/") + opendal.create_dir("d/e/") + + assert sorted(opendal.list("/")) == ["/", "a/", "d/"] + assert sorted(opendal.scan("/")) == ["/", "a/", "a/b/", "a/b/c/", "d/", "d/e/"] + + +def test_driver_opendal_presign(tmp_path): with serve(Opendal(scheme="http", kwargs={"endpoint": "http://invalid.invalid"})) as client: + capability = client.capability() + + assert capability.presign_read assert client.presign_read("test", 100) == PresignedRequest( url="http://invalid.invalid/test", method="GET", headers={} ) + + assert capability.presign_stat assert client.presign_stat("test", 100) == PresignedRequest( url="http://invalid.invalid/test", method="HEAD", headers={} ) From 052cf5babe566da84eff8d9cc76d8218acd94069 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Thu, 27 Feb 2025 13:30:05 -0500 Subject: [PATCH 13/13] Bring back put_file helper --- .../jumpstarter_driver_http/client.py | 19 +++++++++++++++++++ .../jumpstarter_driver_http/driver_test.py | 6 +++--- .../jumpstarter-driver-http/pyproject.toml | 3 ++- uv.lock | 2 ++ 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py index 2f5ed37d3..6b431ea72 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/client.py @@ -1,6 +1,9 @@ from dataclasses import dataclass from jumpstarter_driver_composite.client import CompositeClient +from jumpstarter_driver_opendal.common import PathBuf +from opendal import Operator +from yarl import URL @dataclass(kw_only=True) @@ -52,3 +55,19 @@ def get_url(self) -> str: str: The base URL of the server """ return self.call("get_url") + + def put_file(self, dst: PathBuf, src: PathBuf, operator: Operator | None = None) -> str: + """ + Upload a file to the HTTP server using a opendal operator as source. + + Args: + dst (PathBuf): Name to save the file as on the server. + src (PathBuf): Name to read the file from opendal operator. + operator (Operator): opendal operator to read the file from, defaults to local fs. + + Returns: + str: URL of the uploaded file + """ + self.storage.write_from_path(dst, src, operator) + + return str(URL(self.get_url()).joinpath(dst)) diff --git a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py index aa4400b83..da75ff96b 100644 --- a/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py +++ b/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver_test.py @@ -25,11 +25,11 @@ async def test_http_server(http, tmp_path): filename = "test.txt" test_content = b"test content" - http.storage.write_bytes(filename, test_content) + (tmp_path / "src").write_bytes(test_content) - print(http.storage.stat(filename)) + uploaded_url = http.put_file(filename, tmp_path / "src") - uploaded_url = f"{http.get_url()}/{filename}" + print(http.storage.stat(filename)) files = list(http.storage.list("/")) assert filename in files diff --git a/packages/jumpstarter-driver-http/pyproject.toml b/packages/jumpstarter-driver-http/pyproject.toml index d8158a754..cf3191e25 100644 --- a/packages/jumpstarter-driver-http/pyproject.toml +++ b/packages/jumpstarter-driver-http/pyproject.toml @@ -11,7 +11,8 @@ dependencies = [ "anyio>=4.6.2.post1", "jumpstarter", "jumpstarter-driver-composite", - "jumpstarter-driver-opendal" + "jumpstarter-driver-opendal", + "yarl>=1.18.3", ] [tool.hatch.version] diff --git a/uv.lock b/uv.lock index 6d89f0b38..f66dceb8d 100644 --- a/uv.lock +++ b/uv.lock @@ -1153,6 +1153,7 @@ dependencies = [ { name = "jumpstarter" }, { name = "jumpstarter-driver-composite" }, { name = "jumpstarter-driver-opendal" }, + { name = "yarl" }, ] [package.dev-dependencies] @@ -1168,6 +1169,7 @@ requires-dist = [ { name = "jumpstarter", editable = "packages/jumpstarter" }, { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, { name = "jumpstarter-driver-opendal", editable = "packages/jumpstarter-driver-opendal" }, + { name = "yarl", specifier = ">=1.18.3" }, ] [package.metadata.requires-dev]