feat(cache): add upload/download support with multi-scheme downloaders (RFE429)#430
Conversation
…s (RFE429) Introduce upload_object/download_object APIs for streaming file transfers via Arrow Flight. Add scheme-based downloader registry (file, http, https, grpc, grpcs) to executor_manager. CacheStorage backend replaces dufs for package storage, simplifying deployment by removing external dependency.
There was a problem hiding this comment.
Code Review
This pull request implements support for using flame-object-cache as a package storage backend, effectively removing external dependencies like dufs. Key changes include the addition of upload_object and download_object in the Python SDK, a new CacheStorage backend, and a scheme-based PackageDownloader registry in the Rust executor_manager. Feedback highlights a logic error regarding inconsistent key formatting in CacheStorage, performance concerns due to blocking I/O in async Rust functions, and the need for more robust error handling and connection timeouts in the gRPC and HTTP downloaders.
| def download(self, filename: str, local_path: str) -> None: | ||
| from flamepy.core.cache import ObjectRef, download_object | ||
|
|
||
| try: | ||
| ref = ObjectRef(endpoint=self._endpoint, key=filename, version=0) | ||
| download_object(ref, local_path) | ||
| logger.debug(f"Downloaded package from cache: {filename} -> {local_path}") | ||
| except Exception as e: | ||
| raise FlameError(FlameErrorCode.INTERNAL, f"Failed to download package from cache: {str(e)}") | ||
|
|
||
| def delete(self, filename: str) -> None: | ||
| from flamepy.core.cache import delete_objects | ||
|
|
||
| try: | ||
| delete_objects(filename) | ||
| logger.debug(f"Deleted package from cache: {filename}") | ||
| except Exception as e: | ||
| logger.warning(f"Error deleting package from cache: {e}") |
There was a problem hiding this comment.
The download and delete methods in CacheStorage use incorrect key formats. While upload correctly prefixes the key with {self._app_name}/pkg/, these methods use the filename directly. This inconsistency will cause operations to fail because the keys won't match the objects stored in the cache. Additionally, delete_objects expects a valid key prefix (e.g., app/session), so a bare filename will likely trigger a validation error on the server.
def download(self, filename: str, local_path: str) -> None:
from flamepy.core.cache import ObjectRef, download_object
if not self._app_name:
raise FlameError(FlameErrorCode.INVALID_CONFIG, "app_name is required for download")
try:
key = f"{self._app_name}/pkg/{filename}"
ref = ObjectRef(endpoint=self._endpoint, key=key, version=0)
download_object(ref, local_path)
logger.debug(f"Downloaded package from cache: {key} -> {local_path}")
except Exception as e:
raise FlameError(FlameErrorCode.INTERNAL, f"Failed to download package from cache: {str(e)}")
def delete(self, filename: str) -> None:
from flamepy.core.cache import delete_objects
if not self._app_name:
return
try:
key = f"{self._app_name}/pkg/{filename}"
delete_objects(key)
logger.debug(f"Deleted package from cache: {key}")
except Exception as e:
logger.warning(f"Error deleting package from cache: {e}")| async fn download(&self, url: &url::Url, dest_path: &Path) -> Result<(), FlameError> { | ||
| let client = reqwest::Client::builder() | ||
| .timeout(self.timeout) | ||
| .build() | ||
| .map_err(|e| FlameError::Internal(format!("failed to create HTTP client: {}", e)))?; | ||
|
|
||
| let response = client | ||
| .get(url.as_str()) | ||
| .send() | ||
| .await | ||
| .map_err(|e| FlameError::Internal(format!("failed to download package: {}", e)))?; | ||
|
|
||
| if !response.status().is_success() { | ||
| return Err(FlameError::Internal(format!( | ||
| "failed to download package: HTTP {}", | ||
| response.status() | ||
| ))); | ||
| } | ||
|
|
||
| let temp_path = dest_path.with_extension("tmp"); | ||
| let mut file = fs::File::create(&temp_path) | ||
| .map_err(|e| FlameError::Internal(format!("failed to create temp file: {}", e)))?; | ||
|
|
||
| let mut stream = response.bytes_stream(); | ||
| while let Some(chunk) = stream.next().await { | ||
| let chunk = | ||
| chunk.map_err(|e| FlameError::Internal(format!("failed to read chunk: {}", e)))?; | ||
| file.write_all(&chunk) | ||
| .map_err(|e| FlameError::Internal(format!("failed to write chunk: {}", e)))?; | ||
| } | ||
|
|
||
| file.sync_all() | ||
| .map_err(|e| FlameError::Internal(format!("failed to sync file: {}", e)))?; | ||
| drop(file); | ||
|
|
||
| fs::rename(&temp_path, dest_path) | ||
| .map_err(|e| FlameError::Internal(format!("failed to rename temp file: {}", e)))?; | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
The HttpDownloader uses synchronous blocking I/O (std::fs and std::io::Write) within an async function. This can block the Tokio runtime threads, leading to performance degradation of the executor_manager. You should use tokio::fs and tokio::io::AsyncWriteExt instead. Furthermore, the reqwest::Client should be instantiated once (e.g., in HttpDownloader::new) and reused across calls to leverage connection pooling.
| async fn download(&self, url: &url::Url, dest_path: &Path) -> Result<(), FlameError> { | ||
| use arrow::array::{Array, BinaryArray}; | ||
| use arrow_flight::FlightClient; | ||
| use futures_util::TryStreamExt; | ||
| use tonic::transport::Channel; | ||
|
|
||
| let host = url | ||
| .host_str() | ||
| .ok_or_else(|| FlameError::InvalidConfig("missing host in grpc URL".to_string()))?; | ||
| let port = url.port().unwrap_or(9090); | ||
|
|
||
| let endpoint = if url.scheme() == "grpcs" { | ||
| format!("https://{}:{}", host, port) | ||
| } else { | ||
| format!("http://{}:{}", host, port) | ||
| }; | ||
|
|
||
| let key = url.path().trim_start_matches('/'); | ||
|
|
||
| let channel = Channel::from_shared(endpoint) | ||
| .map_err(|e| FlameError::Internal(format!("invalid endpoint: {}", e)))? | ||
| .connect() | ||
| .await | ||
| .map_err(|e| FlameError::Internal(format!("failed to connect to cache: {}", e)))?; | ||
|
|
||
| let mut client = FlightClient::new(channel); | ||
|
|
||
| let ticket = arrow_flight::Ticket::new(format!("{}:0", key)); | ||
| let mut stream = client | ||
| .do_get(ticket) | ||
| .await | ||
| .map_err(|e| FlameError::Internal(format!("do_get failed: {}", e)))?; | ||
|
|
||
| let temp_path = dest_path.with_extension("tmp"); | ||
| let mut file = fs::File::create(&temp_path) | ||
| .map_err(|e| FlameError::Internal(format!("failed to create temp file: {}", e)))?; | ||
|
|
||
| let mut total_size = 0usize; | ||
| while let Some(batch) = stream | ||
| .try_next() | ||
| .await | ||
| .map_err(|e| FlameError::Internal(format!("stream error: {}", e)))? | ||
| { | ||
| if let Some(array) = batch.column_by_name("data") { | ||
| if let Some(binary_array) = array.as_any().downcast_ref::<BinaryArray>() { | ||
| for i in 0..binary_array.len() { | ||
| let chunk = binary_array.value(i); | ||
| file.write_all(chunk).map_err(|e| { | ||
| FlameError::Internal(format!("failed to write chunk: {}", e)) | ||
| })?; | ||
| total_size += chunk.len(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if total_size == 0 { | ||
| fs::remove_file(&temp_path).ok(); | ||
| return Err(FlameError::Internal(format!("object not found: {}", key))); | ||
| } | ||
|
|
||
| file.sync_all() | ||
| .map_err(|e| FlameError::Internal(format!("failed to sync file: {}", e)))?; | ||
| drop(file); | ||
|
|
||
| fs::rename(&temp_path, dest_path) | ||
| .map_err(|e| FlameError::Internal(format!("failed to rename temp file: {}", e)))?; | ||
|
|
||
| tracing::info!( | ||
| "Downloaded package via gRPC: {} ({} bytes)", | ||
| key, | ||
| total_size | ||
| ); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Like the HTTP downloader, GrpcDownloader performs blocking file I/O in an async context. Switching to tokio::fs is recommended. Also, the gRPC connection attempt at line 130 lacks a timeout; using .connect_timeout() on the Endpoint would make the downloader more resilient to network issues or server hangs.
| writer, reader = client.do_put(descriptor, schema) | ||
|
|
||
| file_size = os.path.getsize(file_path) | ||
| try: | ||
| with open(file_path, "rb") as f: | ||
| while True: | ||
| chunk = f.read(_UPLOAD_CHUNK_SIZE) | ||
| if not chunk: | ||
| break | ||
|
|
||
| batch = pa.RecordBatch.from_arrays( | ||
| [pa.array([0], type=pa.uint64()), pa.array([chunk], type=pa.binary())], | ||
| schema=schema, | ||
| ) | ||
| writer.write_batch(batch) | ||
|
|
||
| writer.done_writing() | ||
|
|
||
| while True: | ||
| metadata_buffer = reader.read() | ||
| if metadata_buffer is None: | ||
| break | ||
| obj_ref_data = bson.decode(bytes(metadata_buffer)) | ||
| writer.close() | ||
| ref = ObjectRef( | ||
| endpoint=obj_ref_data["endpoint"], | ||
| key=obj_ref_data["key"], | ||
| version=obj_ref_data["version"], | ||
| ) | ||
| logger.debug(f"upload_object: key={ref.key}, version={ref.version}, size={file_size}") | ||
| return ref | ||
| except Exception as e: | ||
| writer.close() | ||
| raise ValueError(f"Failed to upload file to cache server: {e}") | ||
|
|
||
| writer.close() | ||
| raise ValueError("No result metadata received from cache server") |
There was a problem hiding this comment.
The error handling in upload_object has a potential UnboundLocalError. If client.do_put fails, the writer variable will not be defined, but the except block attempts to call writer.close(). It is safer to wrap the gRPC call in a try block or ensure the variables are initialized. Also, using a finally block for writer.close() would simplify the code by removing redundant calls in the try, except, and success paths.
The check_package_config fixture now recognizes storage from either package.storage or cache.endpoint, matching the new CacheStorage default.
- Runner: use cache.endpoint fallback when package.storage not set - CacheStorage: fix download/delete to use consistent key format - cache.py: fix upload_object error handling with proper finally block - downloader.rs: use tokio async I/O instead of blocking std::fs - downloader.rs: add connect_timeout for gRPC connections - Fix test expectations for CacheStorage download/delete
The test now correctly skips when either package.storage or cache.endpoint is available, matching the new Runner behavior that uses cache as fallback.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
- Add 300s timeout to upload_object gRPC call to prevent indefinite hangs - Add common excludes to CI configs (.venv, __pycache__, .pytest_cache, etc.) to prevent uploading large directories in test packages - Update test mock to accept options parameter
Default excludes (.venv, __pycache__, .git, etc.) are now built into Runner._create_package() and merged with user-configured excludes. This ensures sensible defaults regardless of config file contents.
GrpcDownloader now accepts optional ClientTlsConfig for grpcs:// URLs. ExecutorManager passes cache.tls config to ApplicationManager, enabling secure package downloads from TLS-enabled object cache.
Add tests for: - DownloaderRegistry: default(), new_with_tls() with Some/None - FileDownloader: invalid URL error handling - HttpDownloader: constructor, connection refused - GrpcDownloader: constructor, with_tls(), missing host, default port - GrpcDownloader TLS: grpcs with/without TLS config
Summary
upload_object/download_objectAPIs to flamepy for streaming file transfers via Arrow FlightDownloaderRegistryin executor_manager supportingfile://,http://,https://,grpc://,grpcs://CacheStoragebackend that replaces external dufs dependency for package storageChanges
Python SDK (
sdk/python)flamepy/core/cache.py: Newupload_object()anddownload_object()functions with chunked streamingflamepy/runner/storage.py: NewCacheStoragebackend using gRPC;create_storage_backend()now defaults to cache when no storage URL specifiedRust Executor Manager (
executor_manager)downloader.rsmodule withPackageDownloadertrait and scheme registryDownloaderRegistrysupporting file, http/https, and grpc/grpcs protocolsApplicationManager::download_package()using the registryConfig/Deployment
flame-package-storage(dufs) container from compose filespackage.storageconfig - cache endpoint fromcache.endpointis now useddocs/designs/RFE429-cache-upload-download/FS.mdCloses #429