From 009187bd2ad5d5f81ac675e526d1bef4442fe9fe Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 15 Apr 2025 16:06:21 -0700 Subject: [PATCH 01/13] wip --- rust/lance-io/src/object_store.rs | 53 +++++++++ rust/lance-io/src/object_store/providers.rs | 121 ++++++++++++++++---- 2 files changed, 152 insertions(+), 22 deletions(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index dca342317a7..7fd1fcf6471 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -171,6 +171,59 @@ impl Default for ObjectStoreParams { } } +// We implement hash for caching +impl std::hash::Hash for ObjectStoreParams { + #[allow(deprecated)] + fn hash(&self, state: &mut H) { + self.block_size.hash(state); + if let Some((store, url)) = &self.object_store { + Arc::as_ptr(store).hash(state); + url.hash(state); + } + self.s3_credentials_refresh_offset.hash(state); + #[cfg(feature = "aws")] + if let Some(aws_credentials) = &self.aws_credentials { + Arc::as_ptr(aws_credentials).hash(state); + } + if let Some(wrapper) = &self.object_store_wrapper { + Arc::as_ptr(wrapper).hash(state); + } + if let Some(storage_options) = &self.storage_options { + for (key, value) in storage_options { + key.hash(state); + value.hash(state); + } + } + self.use_constant_size_upload_parts.hash(state); + self.list_is_lexically_ordered.hash(state); + } +} + +// We implement eq for caching +impl Eq for ObjectStoreParams {} +impl PartialEq for ObjectStoreParams { + #[allow(deprecated)] + fn eq(&self, other: &Self) -> bool { + self.block_size == other.block_size + && self + .object_store + .as_ref() + .map(|(store, url)| (Arc::as_ptr(store), url)) + == other + .object_store + .as_ref() + .map(|(store, url)| (Arc::as_ptr(store), url)) + && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset + && self.aws_credentials.as_ref().map(Arc::as_ptr) + == other.aws_credentials.as_ref().map(Arc::as_ptr) + && self.object_store_wrapper.as_ref().map(Arc::as_ptr) + == other.object_store_wrapper.as_ref().map(Arc::as_ptr) + && self.storage_options == other.storage_options + && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts + && self.list_is_lexically_ordered == other.list_is_lexically_ordered + } +} + impl ObjectStore { /// Parse from a string URI. /// diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 75fee83e1d2..598467f5b92 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -1,9 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex, Weak}, +}; use snafu::location; +use tokio::sync::RwLock; use url::Url; use super::{ObjectStore, ObjectStoreParams}; @@ -25,28 +29,83 @@ pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { #[derive(Debug)] pub struct ObjectStoreRegistry { - providers: HashMap>, - // TODO: should the registry itself have a cache? - // Cache should probable have weak references? + providers: Mutex>>, + // Cache of object stores currently in use. We use a weak reference so the + // cache itself doesn't keep them alive if no object store is actually using + // it. + cache: RwLock>>, +} + +/// Convert a URL to a cache key. +/// +/// We truncate to the first path segment. This should capture +/// buckets and prefixes. We keep URL params since those might be +/// important. +/// +/// * s3://bucket/path?param=value -> s3://bucket/path?param=value +/// * file:///path/to/file -> file:/// +fn cache_url(url: &Url) -> String { + if url.scheme() == "file" || url.scheme() == "file-object-store" { + // For file URLs, we want to cache the URL without the path. + // This is because the path can be different for different + // object stores, but we want to cache the object store itself. + format!("{}://", url.scheme()) + } else { + // drop path after bucket, but keep query params + // e.g. s3://bucket/path?param=value -> s3://bucket?param=value + let mut url = url.clone(); + let first_segment = url + .path_segments() + .and_then(|mut iter| iter.next().map(|s| s.to_string())); + if let Some(first_segment) = first_segment { + url.set_path(&first_segment); + } + url.to_string() + } } impl ObjectStoreRegistry { pub fn empty() -> Self { Self { - providers: HashMap::new(), + providers: Mutex::new(HashMap::new()), + cache: RwLock::new(HashMap::new()), } } pub fn get_provider(&self, scheme: &str) -> Option> { - self.providers.get(scheme).cloned() + self.providers + .lock() + .expect("ObjectStoreRegistry lock poisoned") + .get(scheme) + .cloned() } pub async fn get_store( &self, base_path: Url, params: &ObjectStoreParams, - ) -> Result { - // TODO: caching + ) -> Result> { + let cache_path = cache_url(&base_path); + let cache_key = (cache_path, params.clone()); + + // Check if we have a cached store for this base path and params + { + if let Some(store) = self.cache.read().await.get(&cache_key) { + if let Some(store) = store.upgrade() { + return Ok(store); + } else { + // Remove the weak reference if it is no longer valid + let mut cache_lock = self.cache.write().await; + if let Some(store) = cache_lock.get(&cache_key) { + if store.upgrade().is_none() { + // Remove the weak reference if it is no longer valid + cache_lock.remove(&cache_key); + } + } + } + } + } + let scheme = base_path.scheme(); let provider = self.get_provider(scheme).ok_or_else(|| { Error::invalid_input( @@ -54,39 +113,57 @@ impl ObjectStoreRegistry { location!(), ) })?; - provider.new_store(base_path, params).await + let store = provider.new_store(base_path, params).await?; + let store = Arc::new(store); + + { + // Insert the store into the cache + let mut cache_lock = self.cache.write().await; + cache_lock.insert(cache_key, Arc::downgrade(&store)); + } + + Ok(store) } } impl Default for ObjectStoreRegistry { fn default() -> Self { - let mut registry = Self { - providers: HashMap::new(), - }; - registry.insert("memory", Arc::new(memory::MemoryStoreProvider)); - registry.insert("file", Arc::new(local::FileStoreProvider)); + let mut providers: HashMap> = HashMap::new(); + + providers.insert("memory".into(), Arc::new(memory::MemoryStoreProvider)); + providers.insert("file".into(), Arc::new(local::FileStoreProvider)); // The "file" scheme has special optimized code paths that bypass // the ObjectStore API for better performance. However, this can make it // hard to test when using ObjectStore wrappers, such as IOTrackingStore. // So we provide a "file-object-store" scheme that uses the ObjectStore API. // The specialized code paths are differentiated by the scheme name. - registry.insert("file-object-store", Arc::new(local::FileStoreProvider)); + providers.insert( + "file-object-store".into(), + Arc::new(local::FileStoreProvider), + ); + #[cfg(feature = "aws")] { let aws = Arc::new(aws::AwsStoreProvider); - registry.insert("s3", aws.clone()); - registry.insert("s3+ddb", aws); + providers.insert("s3".into(), aws.clone()); + providers.insert("s3+ddb".into(), aws); } #[cfg(feature = "azure")] - registry.insert("az", Arc::new(azure::AzureBlobStoreProvider)); + providers.insert("az".into(), Arc::new(azure::AzureBlobStoreProvider)); #[cfg(feature = "gcp")] - registry.insert("gs", Arc::new(gcp::GcsStoreProvider)); - registry + providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider)); + Self { + providers: Mutex::new(providers), + cache: RwLock::new(HashMap::new()), + } } } impl ObjectStoreRegistry { - pub fn insert(&mut self, scheme: &str, provider: Arc) { - self.providers.insert(scheme.into(), provider); + pub fn insert(&self, scheme: &str, provider: Arc) { + self.providers + .lock() + .expect("ObjectStoreRegistry lock poisoned") + .insert(scheme.into(), provider); } } From 5cba0ca69e1c0988cd4236ae79fbfe52b88f0acc Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 16 Apr 2025 14:25:05 -0700 Subject: [PATCH 02/13] cache stores --- java/core/lance-jni/src/file_reader.rs | 1 - rust/lance-file/benches/reader.rs | 23 +- rust/lance-file/src/reader.rs | 14 +- rust/lance-index/benches/inverted.rs | 2 +- rust/lance-index/benches/ngram.rs | 2 +- rust/lance-index/src/scalar/btree.rs | 2 +- .../src/scalar/inverted/builder.rs | 2 +- rust/lance-index/src/scalar/lance_format.rs | 9 +- rust/lance-index/src/scalar/ngram.rs | 10 +- rust/lance-io/benches/scheduler.rs | 2 +- rust/lance-io/src/object_store.rs | 234 +++++++----------- rust/lance-io/src/object_store/providers.rs | 114 +++++++-- .../src/object_store/providers/aws.rs | 8 +- .../src/object_store/providers/azure.rs | 9 +- .../src/object_store/providers/gcp.rs | 9 +- .../src/object_store/providers/local.rs | 6 +- .../src/object_store/providers/memory.rs | 29 +-- rust/lance/benches/scalar_index.rs | 7 +- rust/lance/src/dataset.rs | 111 ++++++--- rust/lance/src/dataset/builder.rs | 32 ++- rust/lance/src/dataset/fragment/write.rs | 9 +- rust/lance/src/dataset/index.rs | 2 +- rust/lance/src/dataset/write.rs | 11 +- rust/lance/src/dataset/write/commit.rs | 18 +- rust/lance/src/dataset/write/insert.rs | 11 +- rust/lance/src/session.rs | 19 +- 26 files changed, 386 insertions(+), 310 deletions(-) diff --git a/java/core/lance-jni/src/file_reader.rs b/java/core/lance-jni/src/file_reader.rs index 35ca848d441..cf9d301d961 100644 --- a/java/core/lance-jni/src/file_reader.rs +++ b/java/core/lance-jni/src/file_reader.rs @@ -89,7 +89,6 @@ fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result, with_position: bool) -> Result { let tmpdir = tempdir()?; let store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(tmpdir.path())?, FileMetadataCache::no_cache(), )); diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index f6ac7e56cf4..39c38bbadd2 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -51,11 +51,10 @@ impl DeepSizeOf for LanceIndexStore { impl LanceIndexStore { /// Create a new index store at the given directory pub fn new( - object_store: ObjectStore, + object_store: Arc, index_dir: Path, metadata_cache: FileMetadataCache, ) -> Self { - let object_store = Arc::new(object_store); let scheduler = ScanScheduler::new( object_store.clone(), SchedulerConfig::max_bandwidth(&object_store), @@ -319,6 +318,7 @@ pub mod tests { use arrow_select::take::TakeOptions; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; + use futures::FutureExt; use lance_core::{cache::CapacityMode, utils::mask::RowIdTreeMap}; use lance_datagen::{array, gen, ArrayGeneratorExt, BatchCount, ByteCount, RowCount}; use tempfile::{tempdir, TempDir}; @@ -326,7 +326,10 @@ pub mod tests { fn test_store(tempdir: &TempDir) -> Arc { let test_path: &Path = tempdir.path(); let (object_store, test_path) = - ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); + ObjectStore::from_uri(test_path.as_os_str().to_str().unwrap()) + .now_or_never() + .unwrap() + .unwrap(); let cache = FileMetadataCache::with_capacity(128 * 1024 * 1024, CapacityMode::Bytes); Arc::new(LanceIndexStore::new(object_store, test_path, cache)) } diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index 28306409c5e..8a780d4e179 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -643,7 +643,7 @@ impl NGramIndexBuilder { let tmpdir = Arc::new(tempdir()?); let spill_store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(tmpdir.path())?, FileMetadataCache::no_cache(), )); @@ -1249,7 +1249,7 @@ mod tests { let tmpdir = Arc::new(tempdir().unwrap()); let test_store = LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(tmpdir.path()).unwrap(), FileMetadataCache::no_cache(), ); @@ -1453,7 +1453,7 @@ mod tests { let new_tmpdir = Arc::new(tempdir().unwrap()); let test_store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(new_tmpdir.path()).unwrap(), FileMetadataCache::no_cache(), )); @@ -1488,7 +1488,7 @@ mod tests { let new_tmpdir = Arc::new(tempdir().unwrap()); let test_store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(new_tmpdir.path()).unwrap(), FileMetadataCache::no_cache(), )); @@ -1528,7 +1528,7 @@ mod tests { let new_tmpdir = Arc::new(tempdir().unwrap()); let test_store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(new_tmpdir.path()).unwrap(), FileMetadataCache::no_cache(), )); diff --git a/rust/lance-io/benches/scheduler.rs b/rust/lance-io/benches/scheduler.rs index bcb73a2695a..b536781ba4b 100644 --- a/rust/lance-io/benches/scheduler.rs +++ b/rust/lance-io/benches/scheduler.rs @@ -46,7 +46,7 @@ async fn create_data(num_bytes: u64) -> (Arc, Path) { rand::thread_rng().fill_bytes(&mut some_data); obj_store.put(&tmp_file, &some_data).await.unwrap(); - (Arc::new(obj_store), tmp_file) + (obj_store, tmp_file) } const DATA_SIZE: u64 = 128 * 1024 * 1024; diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 7fd1fcf6471..2d343aef656 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::ops::Range; -use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; @@ -22,7 +21,7 @@ use list_retry::ListRetryStream; #[cfg(feature = "aws")] use object_store::aws::AwsCredentialProvider; use object_store::DynObjectStore; -use object_store::{local::LocalFileSystem, Error as ObjectStoreError}; +use object_store::Error as ObjectStoreError; use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore}; use providers::local::FileStoreProvider; use providers::memory::MemoryStoreProvider; @@ -35,7 +34,6 @@ use super::local::LocalObjectReader; mod list_retry; pub mod providers; mod tracing; -use self::tracing::ObjectStoreTracingExt; use crate::object_writer::WriteResult; use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader}; use lance_core::{Error, Result}; @@ -224,11 +222,58 @@ impl PartialEq for ObjectStoreParams { } } +fn uri_to_url(uri: &str) -> Result { + match Url::parse(uri) { + Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { + // On Windows, the drive is parsed as a scheme + local_path_to_url(uri) + } + Ok(url) => Ok(url), + Err(_) => local_path_to_url(uri), + } +} + +fn local_path_to_url(str_path: &str) -> Result { + let expanded = tilde(str_path).to_string(); + + let mut expanded_path = path_abs::PathAbs::new(expanded) + .unwrap() + .as_path() + .to_path_buf(); + // path_abs::PathAbs::new(".") returns an empty string. + if let Some(s) = expanded_path.as_path().to_str() { + if s.is_empty() { + expanded_path = std::env::current_dir()?; + } + } + + Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput { + source: format!("Invalid table location: '{}'", str_path).into(), + location: location!(), + }) +} + +fn extract_path(url: &Url) -> Path { + if url.scheme() == "memory" { + let mut output = String::new(); + if let Some(domain) = url.domain() { + output.push_str(domain); + } + output.push_str(url.path()); + Path::from(output) + } else { + Path::from(url.path()) + } +} + impl ObjectStore { /// Parse from a string URI. /// /// Returns the ObjectStore instance and the absolute path to the object. - pub async fn from_uri(uri: &str) -> Result<(Self, Path)> { + /// + /// This uses the default [ObjectStoreRegistry] to find the object store. + /// To re-use object store instances, use [Self::from_uri_and_params]. + pub async fn from_uri(uri: &str) -> Result<(Arc, Path)> { let registry = Arc::new(ObjectStoreRegistry::default()); Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await @@ -241,7 +286,7 @@ impl ObjectStore { registry: Arc, uri: &str, params: &ObjectStoreParams, - ) -> Result<(Self, Path)> { + ) -> Result<(Arc, Path)> { #[allow(deprecated)] if let Some((store, path)) = params.object_store.as_ref() { let mut inner = store.clone(); @@ -258,79 +303,23 @@ impl ObjectStore { download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, }; let path = Path::from(path.path()); - return Ok((store, path)); - } - let (object_store, path) = match Url::parse(uri) { - Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { - // On Windows, the drive is parsed as a scheme - Self::from_path(uri) - } - Ok(url) => { - let store = Self::new_from_url(registry, url.clone(), params.clone()).await?; - Ok((store, Path::from(url.path()))) - } - Err(_) => Self::from_path(uri), - }?; - - Ok(( - Self { - inner: params - .object_store_wrapper - .as_ref() - .map(|w| w.wrap(object_store.inner.clone())) - .unwrap_or(object_store.inner), - ..object_store - }, - path, - )) - } - - pub fn from_path_with_scheme(str_path: &str, scheme: &str) -> Result<(Self, Path)> { - let expanded = tilde(str_path).to_string(); - - let mut expanded_path = path_abs::PathAbs::new(expanded) - .unwrap() - .as_path() - .to_path_buf(); - // path_abs::PathAbs::new(".") returns an empty string. - if let Some(s) = expanded_path.as_path().to_str() { - if s.is_empty() { - expanded_path = std::env::current_dir()?; - } - } - Ok(( - Self { - inner: Arc::new(LocalFileSystem::new()).traced(), - scheme: String::from(scheme), - block_size: 4 * 1024, // 4KB block size - use_constant_size_upload_parts: false, - list_is_lexically_ordered: false, - io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, - download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, - }, - Path::from_absolute_path(expanded_path.as_path())?, - )) - } - - pub fn from_path(str_path: &str) -> Result<(Self, Path)> { - Self::from_path_with_scheme(str_path, "file") - } - - async fn new_from_url( - registry: Arc, - url: Url, - params: ObjectStoreParams, - ) -> Result { - let url = ensure_table_uri(url)?; - let scheme = url.scheme(); - if let Some(provider) = registry.get_provider(scheme) { - provider.new_store(url, ¶ms).await - } else { - let err = lance_core::Error::from(object_store::Error::NotSupported { - source: format!("Unsupported URI scheme: {} in url {}", scheme, url).into(), - }); - Err(err) + return Ok((Arc::new(store), path)); } + let url = uri_to_url(uri)?; + let path = extract_path(&url); + let store = registry.get_store(url, params).await?; + Ok((store, path)) + } + + #[deprecated(note = "Use `from_uri` instead")] + pub fn from_path(str_path: &str) -> Result<(Arc, Path)> { + Self::from_uri_and_params( + Arc::new(ObjectStoreRegistry::default()), + str_path, + &Default::default(), + ) + .now_or_never() + .unwrap() } /// Local object store. @@ -366,14 +355,6 @@ impl ObjectStore { self.block_size } - pub fn set_block_size(&mut self, new_size: usize) { - self.block_size = new_size; - } - - pub fn set_io_parallelism(&mut self, io_parallelism: usize) { - self.io_parallelism = io_parallelism; - } - pub fn io_parallelism(&self) -> usize { std::env::var("LANCE_IO_THREADS") .map(|val| val.parse::().unwrap()) @@ -675,63 +656,6 @@ fn infer_block_size(scheme: &str) -> usize { } } -/// Attempt to create a Url from given table location. -/// -/// The location could be: -/// * A valid URL, which will be parsed and returned -/// * A path to a directory, which will be created and then converted to a URL. -/// -/// If it is a local path, it will be created if it doesn't exist. -/// -/// Extra slashes will be removed from the end path as well. -/// -/// Will return an error if the location is not valid. For example, -pub fn ensure_table_uri(table_uri: impl AsRef) -> Result { - let table_uri = table_uri.as_ref(); - - enum UriType { - LocalPath(PathBuf), - Url(Url), - } - let uri_type: UriType = if let Ok(url) = Url::parse(table_uri) { - if url.scheme() == "file" { - UriType::LocalPath(url.to_file_path().map_err(|err| { - let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); - Error::InvalidTableLocation { message: msg } - })?) - // NOTE this check is required to support absolute windows paths which may properly parse as url - } else { - UriType::Url(url) - } - } else { - UriType::LocalPath(PathBuf::from(table_uri)) - }; - - // If it is a local path, we need to create it if it does not exist. - let mut url = match uri_type { - UriType::LocalPath(path) => { - let path = std::fs::canonicalize(path).map_err(|err| Error::DatasetNotFound { - path: table_uri.to_string(), - source: Box::new(err), - location: location!(), - })?; - Url::from_directory_path(path).map_err(|_| { - let msg = format!( - "Could not construct a URL from canonicalized path: {}.\n\ - Something must be very wrong with the table path.", - table_uri - ); - Error::InvalidTableLocation { message: msg } - })? - } - UriType::Url(url) => url, - }; - - let trimmed_path = url.path().trim_end_matches('/').to_owned(); - url.set_path(&trimmed_path); - Ok(url) -} - #[cfg(test)] mod tests { use super::*; @@ -751,7 +675,7 @@ mod tests { write(path, contents) } - async fn read_from_store(store: ObjectStore, path: &Path) -> Result { + async fn read_from_store(store: &ObjectStore, path: &Path) -> Result { let test_file_store = store.open(path).await.unwrap(); let size = test_file_store.size().await.unwrap(); let bytes = test_file_store.get_range(0..size).await.unwrap(); @@ -776,7 +700,7 @@ mod tests { format!("{tmp_path}/bar/foo.lance/../foo.lance"), ] { let (store, path) = ObjectStore::from_uri(uri).await.unwrap(); - let contents = read_from_store(store, &path.child("test_file")) + let contents = read_from_store(store.as_ref(), &path.child("test_file")) .await .unwrap(); assert_eq!(contents, "TEST_CONTENT"); @@ -875,7 +799,7 @@ mod tests { set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir"); let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap(); - let contents = read_from_store(store, &path.child("test_file")) + let contents = read_from_store(store.as_ref(), &path.child("test_file")) .await .unwrap(); assert_eq!(contents, "RELATIVE_URL"); @@ -886,7 +810,7 @@ mod tests { let uri = "~/foo.lance"; write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap(); let (store, path) = ObjectStore::from_uri(uri).await.unwrap(); - let contents = read_from_store(store, &path.child("test_file")) + let contents = read_from_store(store.as_ref(), &path.child("test_file")) .await .unwrap(); assert_eq!(contents, "TILDE"); @@ -1024,6 +948,24 @@ mod tests { assert_eq!(buf.as_bytes(), b"LOCAL"); } + #[test] + fn test_path_parsing() { + let cases = [ + ("file:///", ""), + ("file:///usr/local/bin", "usr/local/bin"), + ("memory://test/path", "test/path"), + ("s3://bucket/path", "path"), + ("s3+ddb://bucket/path", "path"), + ("gs://bucket/path", "path"), + ("az://account/path", "path"), + ]; + for (uri, expected_path) in cases { + let url = uri_to_url(uri).unwrap(); + let path = extract_path(&url); + assert_eq!(path.to_string(), expected_path); + } + } + #[tokio::test] #[cfg(windows)] async fn test_windows_paths() { diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 598467f5b92..6980bff3309 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -3,14 +3,14 @@ use std::{ collections::HashMap, - sync::{Arc, Mutex, Weak}, + sync::{Arc, RwLock, Weak}, }; +use deepsize::DeepSizeOf; use snafu::location; -use tokio::sync::RwLock; use url::Url; -use super::{ObjectStore, ObjectStoreParams}; +use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams}; use lance_core::error::{Error, Result}; #[cfg(feature = "aws")] @@ -29,11 +29,35 @@ pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { #[derive(Debug)] pub struct ObjectStoreRegistry { - providers: Mutex>>, + providers: RwLock>>, // Cache of object stores currently in use. We use a weak reference so the // cache itself doesn't keep them alive if no object store is actually using // it. - cache: RwLock>>, + active_stores: RwLock>>, +} + +impl DeepSizeOf for ObjectStoreRegistry { + fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { + let mut size = 0; + let providers = self + .providers + .read() + .expect("ObjectStoreRegistry lock poisoned"); + for (key, _provider) in providers.iter() { + size += key.deep_size_of(); + // Ignore provider for simplicity + } + let active_stores = self + .active_stores + .read() + .expect("ObjectStoreRegistry lock poisoned"); + for ((base_path, _params), _store) in active_stores.iter() { + size += base_path.deep_size_of(); + // Ignore params for simplicity + size += std::mem::size_of::>(); + } + size + } } /// Convert a URL to a cache key. @@ -45,7 +69,7 @@ pub struct ObjectStoreRegistry { /// * s3://bucket/path?param=value -> s3://bucket/path?param=value /// * file:///path/to/file -> file:/// fn cache_url(url: &Url) -> String { - if url.scheme() == "file" || url.scheme() == "file-object-store" { + if ["file", "file-object-store", "memory"].contains(&url.scheme()) { // For file URLs, we want to cache the URL without the path. // This is because the path can be different for different // object stores, but we want to cache the object store itself. @@ -67,19 +91,46 @@ fn cache_url(url: &Url) -> String { impl ObjectStoreRegistry { pub fn empty() -> Self { Self { - providers: Mutex::new(HashMap::new()), - cache: RwLock::new(HashMap::new()), + providers: RwLock::new(HashMap::new()), + active_stores: RwLock::new(HashMap::new()), } } pub fn get_provider(&self, scheme: &str) -> Option> { self.providers - .lock() + .read() .expect("ObjectStoreRegistry lock poisoned") .get(scheme) .cloned() } + pub fn active_stores(&self) -> Vec> { + let mut found_inactive = false; + let output = self + .active_stores + .read() + .expect("ObjectStoreRegistry lock poisoned") + .values() + .filter_map(|weak| match weak.upgrade() { + Some(store) => Some(store), + None => { + found_inactive = true; + None + } + }) + .collect(); + + if found_inactive { + // Clean up the cache by removing any weak references that are no longer valid + let mut cache_lock = self + .active_stores + .write() + .expect("ObjectStoreRegistry lock poisoned"); + cache_lock.retain(|_, weak| weak.upgrade().is_some()); + } + output + } + pub async fn get_store( &self, base_path: Url, @@ -90,12 +141,20 @@ impl ObjectStoreRegistry { // Check if we have a cached store for this base path and params { - if let Some(store) = self.cache.read().await.get(&cache_key) { + if let Some(store) = self + .active_stores + .read() + .expect("ObjectStoreRegistry lock poisoned") + .get(&cache_key) + { if let Some(store) = store.upgrade() { return Ok(store); } else { // Remove the weak reference if it is no longer valid - let mut cache_lock = self.cache.write().await; + let mut cache_lock = self + .active_stores + .write() + .expect("ObjectStoreRegistry lock poisoned"); if let Some(store) = cache_lock.get(&cache_key) { if store.upgrade().is_none() { // Remove the weak reference if it is no longer valid @@ -108,17 +167,38 @@ impl ObjectStoreRegistry { let scheme = base_path.scheme(); let provider = self.get_provider(scheme).ok_or_else(|| { + let valid_schemes = self + .providers + .read() + .expect("ObjectStoreRegistry lock poisoned") + .keys() + .cloned() + .collect::>() + .join(", "); Error::invalid_input( - format!("No object store provider found for scheme: {}", scheme), + format!( + "No object store provider found for scheme: '{}'\n valid_schemes: {}", + scheme, valid_schemes + ), location!(), ) })?; - let store = provider.new_store(base_path, params).await?; + let mut store = provider.new_store(base_path, params).await?; + + store.inner = store.inner.traced(); + + if let Some(wrapper) = ¶ms.object_store_wrapper { + store.inner = wrapper.wrap(store.inner); + } + let store = Arc::new(store); { // Insert the store into the cache - let mut cache_lock = self.cache.write().await; + let mut cache_lock = self + .active_stores + .write() + .expect("ObjectStoreRegistry lock poisoned"); cache_lock.insert(cache_key, Arc::downgrade(&store)); } @@ -153,8 +233,8 @@ impl Default for ObjectStoreRegistry { #[cfg(feature = "gcp")] providers.insert("gs".into(), Arc::new(gcp::GcsStoreProvider)); Self { - providers: Mutex::new(providers), - cache: RwLock::new(HashMap::new()), + providers: RwLock::new(providers), + active_stores: RwLock::new(HashMap::new()), } } } @@ -162,7 +242,7 @@ impl Default for ObjectStoreRegistry { impl ObjectStoreRegistry { pub fn insert(&self, scheme: &str, provider: Arc) { self.providers - .lock() + .write() .expect("ObjectStoreRegistry lock poisoned") .insert(scheme.into(), provider); } diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 2bdc38c0f26..6f910ce1034 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -25,8 +25,8 @@ use tokio::sync::RwLock; use url::Url; use crate::object_store::{ - tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams, ObjectStoreProvider, - StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, + DEFAULT_CLOUD_IO_PARALLELISM, }; use lance_core::error::{Error, Result}; @@ -92,10 +92,10 @@ impl ObjectStoreProvider for AwsStoreProvider { .with_credentials(aws_creds) .with_retry(retry_config) .with_region(region); - let store = builder.build()?; + let inner = Arc::new(builder.build()?); Ok(ObjectStore { - inner: Arc::new(store).traced(), + inner, scheme: String::from(base_path.scheme()), block_size, use_constant_size_upload_parts, diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index cdec3f614c4..b7e29247613 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -10,8 +10,8 @@ use object_store::{ use url::Url; use crate::object_store::{ - tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams, ObjectStoreProvider, - StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, + DEFAULT_CLOUD_IO_PARALLELISM, }; use lance_core::error::Result; @@ -41,11 +41,10 @@ impl ObjectStoreProvider for AzureBlobStoreProvider { for (key, value) in storage_options.as_azure_options() { builder = builder.with_config(key, value); } - let store = builder.build()?; - let store = Arc::new(store).traced(); + let inner = Arc::new(builder.build()?); Ok(ObjectStore { - inner: store, + inner, scheme: String::from("az"), block_size, use_constant_size_upload_parts: false, diff --git a/rust/lance-io/src/object_store/providers/gcp.rs b/rust/lance-io/src/object_store/providers/gcp.rs index 78974d0d369..f1f0d5439ea 100644 --- a/rust/lance-io/src/object_store/providers/gcp.rs +++ b/rust/lance-io/src/object_store/providers/gcp.rs @@ -10,8 +10,8 @@ use object_store::{ use url::Url; use crate::object_store::{ - tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams, ObjectStoreProvider, - StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_CLOUD_BLOCK_SIZE, + DEFAULT_CLOUD_IO_PARALLELISM, }; use lance_core::error::Result; @@ -49,11 +49,10 @@ impl ObjectStoreProvider for GcsStoreProvider { let credential_provider = Arc::new(StaticCredentialProvider::new(credential)) as _; builder = builder.with_credentials(credential_provider); } - let store = builder.build()?; - let store = Arc::new(store).traced(); + let inner = Arc::new(builder.build()?); Ok(ObjectStore { - inner: store, + inner, scheme: String::from("gs"), block_size, use_constant_size_upload_parts: false, diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs index c66a9ab0afb..0366094ef62 100644 --- a/rust/lance-io/src/object_store/providers/local.rs +++ b/rust/lance-io/src/object_store/providers/local.rs @@ -7,8 +7,8 @@ use object_store::local::LocalFileSystem; use url::Url; use crate::object_store::{ - tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams, ObjectStoreProvider, - StorageOptions, DEFAULT_LOCAL_BLOCK_SIZE, DEFAULT_LOCAL_IO_PARALLELISM, + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_LOCAL_BLOCK_SIZE, + DEFAULT_LOCAL_IO_PARALLELISM, }; use lance_core::error::Result; @@ -22,7 +22,7 @@ impl ObjectStoreProvider for FileStoreProvider { let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default()); let download_retry_count = storage_options.download_retry_count(); Ok(ObjectStore { - inner: Arc::new(LocalFileSystem::new()).traced(), + inner: Arc::new(LocalFileSystem::new()), scheme: base_path.scheme().to_owned(), block_size, use_constant_size_upload_parts: false, diff --git a/rust/lance-io/src/object_store/providers/memory.rs b/rust/lance-io/src/object_store/providers/memory.rs index 7e9e52303e0..667058aef30 100644 --- a/rust/lance-io/src/object_store/providers/memory.rs +++ b/rust/lance-io/src/object_store/providers/memory.rs @@ -7,8 +7,7 @@ use object_store::memory::InMemory; use url::Url; use crate::object_store::{ - tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams, ObjectStoreProvider, - StorageOptions, DEFAULT_LOCAL_BLOCK_SIZE, + ObjectStore, ObjectStoreParams, ObjectStoreProvider, StorageOptions, DEFAULT_LOCAL_BLOCK_SIZE, }; use lance_core::{error::Result, utils::tokio::get_num_compute_intensive_cpus}; @@ -23,31 +22,7 @@ impl ObjectStoreProvider for MemoryStoreProvider { let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default()); let download_retry_count = storage_options.download_retry_count(); Ok(ObjectStore { - inner: Arc::new(InMemory::new()).traced(), - scheme: String::from("memory"), - block_size, - use_constant_size_upload_parts: false, - list_is_lexically_ordered: true, - io_parallelism: get_num_compute_intensive_cpus(), - download_retry_count, - }) - } -} - -/// Provides the given in-memory object store for each call to `new_store`. -#[derive(Debug)] -pub struct PersistentMemoryStoreProvider { - pub inner: Arc, -} - -#[async_trait::async_trait] -impl ObjectStoreProvider for PersistentMemoryStoreProvider { - async fn new_store(&self, _base_path: Url, params: &ObjectStoreParams) -> Result { - let block_size = params.block_size.unwrap_or(DEFAULT_LOCAL_BLOCK_SIZE); - let storage_options = StorageOptions(params.storage_options.clone().unwrap_or_default()); - let download_retry_count = storage_options.download_retry_count(); - Ok(ObjectStore { - inner: self.inner.clone(), + inner: Arc::new(InMemory::new()), scheme: String::from("memory"), block_size, use_constant_size_upload_parts: false, diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index a22ffbf97e6..7cf852fcd04 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -10,7 +10,7 @@ use arrow_array::{ use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::{physical_plan::SendableRecordBatchStream, scalar::ScalarValue}; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use lance::{io::ObjectStore, Dataset}; use lance_core::{cache::FileMetadataCache, Result}; use lance_datafusion::utils::reader_to_stream; @@ -64,7 +64,10 @@ impl BenchmarkFixture { fn test_store(tempdir: &TempDir) -> Arc { let test_path = tempdir.path(); let (object_store, test_path) = - ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); + ObjectStore::from_uri(test_path.as_os_str().to_str().unwrap()) + .now_or_never() + .unwrap() + .unwrap(); Arc::new(LanceIndexStore::new( object_store, test_path, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6438358384b..dec534036ed 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -197,8 +197,6 @@ pub struct ReadParams { /// If a custom object store is provided (via store_params.object_store) then this /// must also be provided. pub commit_handler: Option>, - - pub object_store_registry: Arc, } impl ReadParams { @@ -220,15 +218,6 @@ impl ReadParams { self } - /// Provide an object store registry for custom object stores - pub fn with_object_store_registry( - &mut self, - object_store_registry: Arc, - ) -> &mut Self { - self.object_store_registry = object_store_registry; - self - } - /// Use the explicit locking to resolve the latest version pub fn set_commit_lock(&mut self, lock: Arc) { self.commit_handler = Some(Arc::new(lock)); @@ -243,7 +232,6 @@ impl Default for ReadParams { session: None, store_options: None, commit_handler: None, - object_store_registry: Arc::new(ObjectStoreRegistry::default()), } } } @@ -1769,7 +1757,6 @@ mod tests { use lance_index::scalar::inverted::TokenizerConfig; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType}; - use lance_io::object_store::providers::memory::PersistentMemoryStoreProvider; use lance_linalg::distance::MetricType; use lance_table::feature_flags; use lance_table::format::{DataFile, WriterVersion}; @@ -2019,16 +2006,8 @@ mod tests { // Need to use in-memory for accurate IOPS tracking. use crate::utils::test::IoTrackingStore; - let mut store_registry = ObjectStoreRegistry::empty(); - let memory_store = Arc::new(object_store::memory::InMemory::new()); - store_registry.insert( - "memory", - Arc::new(PersistentMemoryStoreProvider { - inner: memory_store, - }), - ); - let store_registry = Arc::new(store_registry); - + // Use consistent session so memory store can be reused. + let session = Arc::new(Session::default()); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "i", DataType::Int32, @@ -2040,26 +2019,31 @@ mod tests { ) .unwrap(); let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - Dataset::write( + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let _original_ds = Dataset::write( batches, "memory://test", Some(WriteParams { - object_store_registry: store_registry.clone(), + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper.clone()), + ..Default::default() + }), + session: Some(session.clone()), ..Default::default() }), ) .await .unwrap(); - // Then open with wrapping store. - let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + io_stats.lock().unwrap().read_iops = 0; + let _dataset = DatasetBuilder::from_uri("memory://test") .with_read_params(ReadParams { store_options: Some(ObjectStoreParams { object_store_wrapper: Some(io_stats_wrapper), ..Default::default() }), - object_store_registry: store_registry.clone(), + session: Some(session), ..Default::default() }) .load() @@ -6113,4 +6097,75 @@ mod tests { assert_eq!(ds.manifest.version, 1); assert_eq!(ds2.manifest.version, 1); } + + #[tokio::test] + async fn test_session_store_registry() { + // Create a session + let session = Arc::new(Session::default()); + let registry = session.store_registry(); + assert!(registry.active_stores().is_empty()); + + // Create a dataset with memory store + let write_params = WriteParams { + session: Some(session.clone()), + ..Default::default() + }; + let batch = RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + false, + )])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let dataset = InsertBuilder::new("memory://test") + .with_params(&write_params) + .execute(vec![batch.clone()]) + .await + .unwrap(); + + // Assert there is one active store. + assert_eq!(registry.active_stores().len(), 1); + + // If we create another dataset also in memory, it should re-use the + // existing store. + let dataset2 = InsertBuilder::new("memory://test2") + .with_params(&write_params) + .execute(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(registry.active_stores().len(), 1); + assert_eq!( + Arc::as_ptr(&dataset.object_store().inner), + Arc::as_ptr(&dataset2.object_store().inner) + ); + + // If we create another with **different parameters**, it should create a new store. + let write_params2 = WriteParams { + session: Some(session.clone()), + store_params: Some(ObjectStoreParams { + block_size: Some(10_000), + ..Default::default() + }), + ..Default::default() + }; + let dataset3 = InsertBuilder::new("memory://test3") + .with_params(&write_params2) + .execute(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(registry.active_stores().len(), 2); + assert_ne!( + Arc::as_ptr(&dataset.object_store().inner), + Arc::as_ptr(&dataset3.object_store().inner) + ); + + // Remove both datasets + drop(dataset3); + assert_eq!(registry.active_stores().len(), 1); + drop(dataset2); + drop(dataset); + assert_eq!(registry.active_stores().len(), 0); + } } diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 6667f608b1b..4026da9b16b 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -39,7 +39,6 @@ pub struct DatasetBuilder { options: ObjectStoreParams, version: Option, table_uri: String, - object_store_registry: Arc, } impl DatasetBuilder { @@ -53,7 +52,6 @@ impl DatasetBuilder { session: None, version: None, manifest: None, - object_store_registry: Arc::new(ObjectStoreRegistry::default()), } } } @@ -181,8 +179,6 @@ impl DatasetBuilder { self.commit_handler = Some(commit_handler); } - self.object_store_registry = read_params.object_store_registry.clone(); - self } @@ -196,8 +192,6 @@ impl DatasetBuilder { self.commit_handler = Some(commit_handler); } - self.object_store_registry = write_params.object_store_registry.clone(); - self } @@ -211,13 +205,10 @@ impl DatasetBuilder { self } - pub fn with_object_store_registry(mut self, registry: Arc) -> Self { - self.object_store_registry = registry; - self - } - /// Build a lance object store for the given config - pub async fn build_object_store(self) -> Result<(ObjectStore, Path, Arc)> { + pub async fn build_object_store( + self, + ) -> Result<(Arc, Path, Arc)> { let commit_handler = match self.commit_handler { Some(commit_handler) => Ok(commit_handler), None => commit_handler_from_url(&self.table_uri, &Some(self.options.clone())).await, @@ -231,10 +222,16 @@ impl DatasetBuilder { .unwrap_or_default(); let download_retry_count = storage_options.download_retry_count(); + let store_registry = self + .session + .as_ref() + .map(|s| s.store_registry()) + .unwrap_or_else(|| Arc::new(ObjectStoreRegistry::default())); + #[allow(deprecated)] match &self.options.object_store { Some(store) => Ok(( - ObjectStore::new( + Arc::new(ObjectStore::new( store.0.clone(), store.1.clone(), self.options.block_size, @@ -245,13 +242,13 @@ impl DatasetBuilder { // cloud-like DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, - ), + )), Path::from(store.1.path()), commit_handler, )), None => { let (store, path) = ObjectStore::from_uri_and_params( - self.object_store_registry.clone(), + store_registry, &self.table_uri, &self.options, ) @@ -268,6 +265,7 @@ impl DatasetBuilder { None => Arc::new(Session::new( self.index_cache_size, self.metadata_cache_size, + Default::default(), )), }; @@ -286,7 +284,7 @@ impl DatasetBuilder { Ref::Version(v) => Some(v), Ref::Tag(t) => { let tags = Tags::new( - Arc::new(object_store.clone()), + object_store.clone(), commit_handler.clone(), base_path.clone(), ); @@ -326,7 +324,7 @@ impl DatasetBuilder { }; Dataset::checkout_manifest( - Arc::new(object_store), + object_store, base_path, table_uri, manifest, diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 213e4a8ee33..77bd34840b4 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -16,7 +16,6 @@ use lance_table::format::{DataFile, Fragment}; use lance_table::io::manifest::ManifestDescribing; use snafu::location; use std::borrow::Cow; -use std::sync::Arc; use uuid::Uuid; use crate::dataset::builder::DatasetBuilder; @@ -90,7 +89,7 @@ impl<'a> FragmentCreateBuilder<'a> { Self::validate_schema(&schema, stream.schema().as_ref())?; let (object_store, base_path) = ObjectStore::from_uri_and_params( - params.object_store_registry.clone(), + params.store_registry(), self.dataset_uri, ¶ms.store_params.clone().unwrap_or_default(), ) @@ -156,13 +155,13 @@ impl<'a> FragmentCreateBuilder<'a> { Self::validate_schema(&schema, stream.schema().as_ref())?; let (object_store, base_path) = ObjectStore::from_uri_and_params( - params.object_store_registry.clone(), + params.store_registry(), self.dataset_uri, ¶ms.store_params.clone().unwrap_or_default(), ) .await?; do_write_fragments( - Arc::new(object_store), + object_store, &base_path, &schema, stream, @@ -192,7 +191,7 @@ impl<'a> FragmentCreateBuilder<'a> { Self::validate_schema(&schema, stream.schema().as_ref())?; let (object_store, base_path) = ObjectStore::from_uri_and_params( - params.object_store_registry.clone(), + params.store_registry(), self.dataset_uri, ¶ms.store_params.clone().unwrap_or_default(), ) diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index 56c211d4c7d..ec0ce4f25ab 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -78,7 +78,7 @@ impl LanceIndexStoreExt for LanceIndexStore { fn from_dataset(dataset: &Dataset, uuid: &str) -> Self { let index_dir = dataset.indices_dir().child(uuid); Self::new( - dataset.object_store.as_ref().clone(), + dataset.object_store.clone(), index_dir, dataset.session.file_metadata_cache.clone(), ) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 3aeefe40598..57e85f7635f 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -187,8 +187,6 @@ pub struct WriteParams { /// Default is False. pub enable_v2_manifest_paths: bool, - pub object_store_registry: Arc, - pub session: Option>, /// If Some and this is a new dataset, old dataset versions will be @@ -215,7 +213,6 @@ impl Default for WriteParams { data_storage_version: None, enable_move_stable_row_ids: false, enable_v2_manifest_paths: false, - object_store_registry: Arc::new(ObjectStoreRegistry::default()), session: None, auto_cleanup: Some(AutoCleanupParams::default()), } @@ -235,6 +232,13 @@ impl WriteParams { pub fn storage_version_or_default(&self) -> LanceFileVersion { self.data_storage_version.unwrap_or_default() } + + pub fn store_registry(&self) -> Arc { + self.session + .as_ref() + .map(|s| s.store_registry()) + .unwrap_or_default() + } } /// Writes the given data to the dataset and returns fragments. @@ -413,7 +417,6 @@ pub async fn write_fragments_internal( enable_move_stable_row_ids: true, // This shouldn't really matter since all commits are detached enable_v2_manifest_paths: true, - object_store_registry: params.object_store_registry.clone(), max_bytes_per_file: params.max_bytes_per_file, max_rows_per_file: params.max_rows_per_file, ..Default::default() diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index cdd7a5a5063..38dd7d21690 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -175,7 +175,7 @@ impl<'a> CommitBuilder<'a> { &self.store_params.clone().unwrap_or_default(), ) .await?; - let mut object_store = Arc::new(object_store); + let mut object_store = object_store; let commit_handler = if self.commit_handler.is_some() && self.object_store.is_some() { self.commit_handler.as_ref().unwrap().clone() @@ -203,7 +203,6 @@ impl<'a> CommitBuilder<'a> { .with_read_params(ReadParams { store_options: self.store_params.clone(), commit_handler: self.commit_handler.clone(), - object_store_registry: self.object_store_registry.clone(), ..Default::default() }) .with_session(session.clone()); @@ -423,7 +422,6 @@ pub struct BatchCommitResult { mod tests { use arrow::array::{Int32Array, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; - use lance_io::object_store::providers::memory::PersistentMemoryStoreProvider; use lance_table::{ format::{DataFile, Fragment}, io::commit::ConditionalPutCommitHandler, @@ -466,16 +464,8 @@ mod tests { // Need to use in-memory for accurate IOPS tracking. use crate::utils::test::IoTrackingStore; - let mut store_registry = ObjectStoreRegistry::empty(); - let memory_store = Arc::new(object_store::memory::InMemory::new()); - store_registry.insert( - "memory", - Arc::new(PersistentMemoryStoreProvider { - inner: memory_store, - }), - ); - let store_registry = Arc::new(store_registry); - + let session = Arc::new(Session::default()); + let store_registry = session.store_registry(); // Create new dataset let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "i", @@ -496,7 +486,7 @@ mod tests { .with_params(&WriteParams { store_params: Some(store_params.clone()), commit_handler: Some(Arc::new(ConditionalPutCommitHandler)), - object_store_registry: store_registry.clone(), + session: Some(session.clone()), ..Default::default() }) .execute(vec![batch]) diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 7d01f99bdb3..a73ecd9e0ea 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -121,7 +121,6 @@ impl<'a> InsertBuilder<'a> { let mut commit_builder = CommitBuilder::new(context.dest.clone()) .use_move_stable_row_ids(context.params.enable_move_stable_row_ids) .with_storage_format(context.storage_version) - .with_object_store_registry(context.params.object_store_registry.clone()) .enable_v2_manifest_paths(context.params.enable_v2_manifest_paths) .with_commit_handler(context.commit_handler.clone()) .with_object_store(context.object_store.clone()); @@ -360,8 +359,13 @@ impl<'a> InsertBuilder<'a> { dataset.commit_handler.clone(), ), WriteDestination::Uri(uri) => { + let registry = params + .session + .as_ref() + .map(|s| s.store_registry()) + .unwrap_or_else(|| Arc::new(Default::default())); let (object_store, base_path) = ObjectStore::from_uri_and_params( - params.object_store_registry.clone(), + registry, uri, ¶ms.store_params.clone().unwrap_or_default(), ) @@ -372,7 +376,7 @@ impl<'a> InsertBuilder<'a> { ¶ms.store_params, ) .await?; - (Arc::new(object_store), base_path, commit_handler) + (object_store, base_path, commit_handler) } }; let dest = match &self.dest { @@ -382,7 +386,6 @@ impl<'a> InsertBuilder<'a> { let builder = DatasetBuilder::from_uri(uri).with_read_params(ReadParams { store_options: params.store_params.clone(), commit_handler: params.commit_handler.clone(), - object_store_registry: params.object_store_registry.clone(), ..Default::default() }); diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index 173b9fa6b1c..14a686740d3 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -8,6 +8,7 @@ use deepsize::DeepSizeOf; use lance_core::cache::FileMetadataCache; use lance_core::{Error, Result}; use lance_index::IndexType; +use lance_io::object_store::ObjectStoreRegistry; use snafu::location; use crate::dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}; @@ -27,6 +28,8 @@ pub struct Session { pub(crate) file_metadata_cache: FileMetadataCache, pub(crate) index_extensions: HashMap<(IndexType, String), Arc>, + + pub(crate) store_registry: Arc, } impl std::fmt::Debug for Session { @@ -57,11 +60,16 @@ impl Session { /// Parameters: /// /// - ***index_cache_size***: the size of the index cache. - pub fn new(index_cache_size: usize, metadata_cache_size: usize) -> Self { + pub fn new( + index_cache_size: usize, + metadata_cache_size: usize, + store_registry: Arc, + ) -> Self { Self { index_cache: IndexCache::new(index_cache_size), file_metadata_cache: FileMetadataCache::new(metadata_cache_size), index_extensions: HashMap::new(), + store_registry, } } @@ -126,6 +134,10 @@ impl Session { + self.file_metadata_cache.approx_size() + self.index_extensions.len() } + + pub fn store_registry(&self) -> Arc { + self.store_registry.clone() + } } impl Default for Session { @@ -134,6 +146,7 @@ impl Default for Session { index_cache: IndexCache::new(DEFAULT_INDEX_CACHE_SIZE), file_metadata_cache: FileMetadataCache::new(DEFAULT_METADATA_CACHE_SIZE), index_extensions: HashMap::new(), + store_registry: Arc::new(ObjectStoreRegistry::default()), } } } @@ -152,7 +165,7 @@ mod tests { #[test] fn test_disable_index_cache() { - let no_cache = Session::new(0, 0); + let no_cache = Session::new(0, 0, Default::default()); assert!(no_cache.index_cache.get_vector("abc").is_none()); let no_cache = Arc::new(no_cache); @@ -173,7 +186,7 @@ mod tests { #[test] fn test_basic() { - let session = Session::new(10, 1); + let session = Session::new(10, 1, Default::default()); let session = Arc::new(session); let pq = ProductQuantizer::new( From eac3509ae301256cc11067811ba8fb8c311462bf Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 16 Apr 2025 14:52:24 -0700 Subject: [PATCH 03/13] try this --- python/src/dataset.rs | 6 +- python/src/file.rs | 8 +-- python/src/indices.rs | 1 + rust/lance-io/src/object_store.rs | 7 ++- rust/lance-io/src/object_store/providers.rs | 70 ++++++++++++++++++--- rust/lance-io/tests/gcs_integration.rs | 4 +- rust/lance/src/session.rs | 7 ++- 7 files changed, 82 insertions(+), 21 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index d32e33f7146..270da8f2803 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2196,11 +2196,7 @@ impl PyFullTextQuery { #[staticmethod] #[pyo3(signature = (positive, negative,negative_boost=None))] - fn boost_query( - positive: PyFullTextQuery, - negative: PyFullTextQuery, - negative_boost: Option, - ) -> PyResult { + fn boost_query(positive: Self, negative: Self, negative_boost: Option) -> PyResult { Ok(Self { inner: BoostQuery::new(positive.inner, negative.inner, negative_boost).into(), }) diff --git a/python/src/file.rs b/python/src/file.rs index eaff3ecade7..222965f85ea 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -333,7 +333,7 @@ fn path_to_parent(path: &Path) -> PyResult<(Path, String)> { pub async fn object_store_from_uri_or_path_no_options( uri_or_path: impl AsRef, -) -> PyResult<(ObjectStore, Path)> { +) -> PyResult<(Arc, Path)> { object_store_from_uri_or_path(uri_or_path, None).await } @@ -344,7 +344,7 @@ pub async fn object_store_from_uri_or_path_no_options( pub async fn object_store_from_uri_or_path( uri_or_path: impl AsRef, storage_options: Option>, -) -> PyResult<(ObjectStore, Path)> { +) -> PyResult<(Arc, Path)> { if let Ok(mut url) = Url::parse(uri_or_path.as_ref()) { if url.scheme().len() > 1 { let path = object_store::path::Path::parse(url.path()).map_err(|e| { @@ -376,7 +376,7 @@ pub async fn object_store_from_uri_or_path( let path = Path::parse(uri_or_path.as_ref()).map_err(|e| { PyIOError::new_err(format!("Invalid path `{}`: {}", uri_or_path.as_ref(), e)) })?; - let object_store = ObjectStore::local(); + let object_store = Arc::new(ObjectStore::local()); Ok((object_store, path)) } @@ -393,7 +393,7 @@ impl LanceFileReader { let (object_store, path) = object_store_from_uri_or_path(uri_or_path, storage_options).await?; let scheduler = ScanScheduler::new( - Arc::new(object_store), + object_store, SchedulerConfig { io_buffer_size_bytes: 2 * 1024 * 1024 * 1024, }, diff --git a/python/src/indices.rs b/python/src/indices.rs index 3df7791d481..3edb948356e 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -242,6 +242,7 @@ pub fn transform_vectors( )? } +#[allow(deprecated)] async fn do_shuffle_transformed_vectors( unsorted_filenames: Vec, dir_path: &str, diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 2d343aef656..8ac2b5c00a5 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -173,6 +173,7 @@ impl Default for ObjectStoreParams { impl std::hash::Hash for ObjectStoreParams { #[allow(deprecated)] fn hash(&self, state: &mut H) { + // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper self.block_size.hash(state); if let Some((store, url)) = &self.object_store { Arc::as_ptr(store).hash(state); @@ -202,6 +203,7 @@ impl Eq for ObjectStoreParams {} impl PartialEq for ObjectStoreParams { #[allow(deprecated)] fn eq(&self, other: &Self) -> bool { + // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper self.block_size == other.block_size && self .object_store @@ -271,8 +273,9 @@ impl ObjectStore { /// /// Returns the ObjectStore instance and the absolute path to the object. /// - /// This uses the default [ObjectStoreRegistry] to find the object store. - /// To re-use object store instances, use [Self::from_uri_and_params]. + /// This uses the default [ObjectStoreRegistry] to find the object store. To + /// allow for potential re-use of object store instances, it's recommended to + /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params]. pub async fn from_uri(uri: &str) -> Result<(Arc, Path)> { let registry = Arc::new(ObjectStoreRegistry::default()); diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 6980bff3309..0a8a043d688 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -27,6 +27,25 @@ pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result; } +/// A registry of object store providers. +/// +/// Use [`Self::default()`] to create one with the available default providers. +/// This includes (depending on features enabled): +/// - `memory`: An in-memory object store. +/// - `file`: A local file object store, with optimized code paths. +/// - `file-object-store`: A local file object store that uses the ObjectStore API, +/// for all operations. Used for testing with ObjectStore wrappers. +/// - `s3`: An S3 object store. +/// - `s3+ddb`: An S3 object store with DynamoDB for metadata. +/// - `az`: An Azure Blob Storage object store. +/// - `gs`: A Google Cloud Storage object store. +/// +/// Use [`Self::empty()`] to create an empty registry, with no providers registered. +/// +/// The registry also caches object stores that are currently in use. It holds +/// weak references to the object stores, so they are not held onto. If an object +/// store is no longer in use, it will be removed from the cache on the next +/// call to either [`Self::active_stores()`] or [`Self::get_store()`]. #[derive(Debug)] pub struct ObjectStoreRegistry { providers: RwLock>>, @@ -75,20 +94,18 @@ fn cache_url(url: &Url) -> String { // object stores, but we want to cache the object store itself. format!("{}://", url.scheme()) } else { - // drop path after bucket, but keep query params - // e.g. s3://bucket/path?param=value -> s3://bucket?param=value + // Bucket is parsed as domain, so we just drop the path. let mut url = url.clone(); - let first_segment = url - .path_segments() - .and_then(|mut iter| iter.next().map(|s| s.to_string())); - if let Some(first_segment) = first_segment { - url.set_path(&first_segment); - } + url.set_path(""); url.to_string() } } impl ObjectStoreRegistry { + /// Create a new registry with no providers registered. + /// + /// Typically, you want to use [`Self::default()`] instead, so you get the + /// default providers. pub fn empty() -> Self { Self { providers: RwLock::new(HashMap::new()), @@ -96,6 +113,7 @@ impl ObjectStoreRegistry { } } + /// Get the object store provider for a given scheme. pub fn get_provider(&self, scheme: &str) -> Option> { self.providers .read() @@ -104,6 +122,10 @@ impl ObjectStoreRegistry { .cloned() } + /// Get a list of all active object stores. + /// + /// Calling this will also clean up any weak references to object stores that + /// are no longer valid. pub fn active_stores(&self) -> Vec> { let mut found_inactive = false; let output = self @@ -131,6 +153,11 @@ impl ObjectStoreRegistry { output } + /// Get an object store for a given base path and parameters. + /// + /// If the object store is already in use, it will return a strong reference + /// to the object store. If the object store is not in use, it will create a + /// new object store and return a strong reference to it. pub async fn get_store( &self, base_path: Url, @@ -240,6 +267,8 @@ impl Default for ObjectStoreRegistry { } impl ObjectStoreRegistry { + /// Add a new object store provider to the registry. This will be called + /// in [`Self::get_store()`] when a URL is passed with a matching scheme. pub fn insert(&self, scheme: &str, provider: Arc) { self.providers .write() @@ -247,3 +276,28 @@ impl ObjectStoreRegistry { .insert(scheme.into(), provider); } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cache_url() { + let cases = [ + ("s3://bucket/path?param=value", "s3://bucket?param=value"), + ("file:///path/to/file", "file://"), + ("file-object-store:///path/to/file", "file-object-store://"), + ("memory:///", "memory://"), + ( + "http://example.com/path?param=value", + "http://example.com/?param=value", + ), + ]; + + for (url, expected_cache_url) in cases { + let url = Url::parse(url).unwrap(); + let cache_url = cache_url(&url); + assert_eq!(cache_url, expected_cache_url); + } + } +} diff --git a/rust/lance-io/tests/gcs_integration.rs b/rust/lance-io/tests/gcs_integration.rs index 92137322441..959a3a61086 100644 --- a/rust/lance-io/tests/gcs_integration.rs +++ b/rust/lance-io/tests/gcs_integration.rs @@ -4,13 +4,15 @@ //! They do not work against any local emulator right now. #![cfg(feature = "gcs-test")] +use std::sync::Arc; + // TODO: Once we re-use this logic for S3, we can instead use tests against // Minio to validate the multipart upload logic. use lance_io::object_store::ObjectStore; use object_store::path::Path; use tokio::io::AsyncWriteExt; -async fn get_store() -> ObjectStore { +async fn get_store() -> Arc { let bucket_name = std::env::var("OBJECT_STORE_BUCKET").unwrap_or_else(|_| "test-bucket".into()); ObjectStore::from_uri(&format!("gs://{}/object", bucket_name)) .await diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index 14a686740d3..2f751bdffdf 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -29,7 +29,7 @@ pub struct Session { pub(crate) index_extensions: HashMap<(IndexType, String), Arc>, - pub(crate) store_registry: Arc, + store_registry: Arc, } impl std::fmt::Debug for Session { @@ -60,6 +60,10 @@ impl Session { /// Parameters: /// /// - ***index_cache_size***: the size of the index cache. + /// - ***metadata_cache_size***: the size of the metadata cache. + /// - ***store_registry***: the object store registry to use when opening + /// datasets. This determines which schemes are available, and also allows + /// re-using object stores. pub fn new( index_cache_size: usize, metadata_cache_size: usize, @@ -135,6 +139,7 @@ impl Session { + self.index_extensions.len() } + /// Get the object store registry. pub fn store_registry(&self) -> Arc { self.store_registry.clone() } From 3ac31a03b43a3d26f0f313125f1bd90dcc158ebc Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 16 Apr 2025 15:56:06 -0700 Subject: [PATCH 04/13] more changes --- java/core/lance-jni/src/blocking_dataset.rs | 3 +- rust/lance/src/dataset.rs | 16 ++++---- rust/lance/src/dataset/builder.rs | 9 ++--- rust/lance/src/dataset/fragment.rs | 23 +++++------ rust/lance/src/dataset/write/commit.rs | 43 ++++++--------------- rust/lance/src/utils/test.rs | 5 +-- 6 files changed, 37 insertions(+), 62 deletions(-) diff --git a/java/core/lance-jni/src/blocking_dataset.rs b/java/core/lance-jni/src/blocking_dataset.rs index 614e6a6d745..3a573620625 100644 --- a/java/core/lance-jni/src/blocking_dataset.rs +++ b/java/core/lance-jni/src/blocking_dataset.rs @@ -116,7 +116,6 @@ impl BlockingDataset { read_version: Option, storage_options: HashMap, ) -> Result { - let object_store_registry = Arc::new(ObjectStoreRegistry::default()); let inner = RT.block_on(Dataset::commit( uri, operation, @@ -126,7 +125,7 @@ impl BlockingDataset { ..Default::default() }), None, - object_store_registry, + Default::default(), false, // TODO: support enable_v2_manifest_paths ))?; Ok(Self { inner }) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index dec534036ed..81002732125 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -22,7 +22,7 @@ use lance_datafusion::projection::ProjectionPlan; use lance_file::datatypes::populate_schema_dictionary; use lance_file::version::LanceFileVersion; use lance_index::DatasetIndexExt; -use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; +use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_io::object_writer::{ObjectWriter, WriteResult}; use lance_io::traits::WriteExt; use lance_io::utils::{read_last_block, read_metadata_offset, read_struct}; @@ -665,7 +665,7 @@ impl Dataset { read_version: Option, store_params: Option, commit_handler: Option>, - object_store_registry: Arc, + session: Arc, enable_v2_manifest_paths: bool, detached: bool, ) -> Result { @@ -683,8 +683,8 @@ impl Dataset { let transaction = Transaction::new(read_version, operation, blobs_op, None); let mut builder = CommitBuilder::new(base_uri) - .with_object_store_registry(object_store_registry) .enable_v2_manifest_paths(enable_v2_manifest_paths) + .with_session(session) .with_detached(detached); if let Some(store_params) = store_params { @@ -738,7 +738,7 @@ impl Dataset { read_version: Option, store_params: Option, commit_handler: Option>, - object_store_registry: Arc, + session: Arc, enable_v2_manifest_paths: bool, ) -> Result { Self::do_commit( @@ -750,7 +750,7 @@ impl Dataset { read_version, store_params, commit_handler, - object_store_registry, + session, enable_v2_manifest_paths, /*detached=*/ false, ) @@ -771,7 +771,7 @@ impl Dataset { read_version: Option, store_params: Option, commit_handler: Option>, - object_store_registry: Arc, + session: Arc, enable_v2_manifest_paths: bool, ) -> Result { Self::do_commit( @@ -783,7 +783,7 @@ impl Dataset { read_version, store_params, commit_handler, - object_store_registry, + session, enable_v2_manifest_paths, /*detached=*/ true, ) @@ -3156,7 +3156,7 @@ mod tests { None, None, None, - Arc::new(ObjectStoreRegistry::default()), + Default::default(), true, // enable_v2_manifest_paths ) .await diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 4026da9b16b..81de2b42afb 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -4,8 +4,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use lance_file::datatypes::populate_schema_dictionary; use lance_io::object_store::{ - ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptions, - DEFAULT_CLOUD_IO_PARALLELISM, + ObjectStore, ObjectStoreParams, StorageOptions, DEFAULT_CLOUD_IO_PARALLELISM, }; use lance_table::{ format::Manifest, @@ -226,7 +225,7 @@ impl DatasetBuilder { .session .as_ref() .map(|s| s.store_registry()) - .unwrap_or_else(|| Arc::new(ObjectStoreRegistry::default())); + .unwrap_or_default(); #[allow(deprecated)] match &self.options.object_store { @@ -260,8 +259,8 @@ impl DatasetBuilder { #[instrument(skip_all)] pub async fn load(mut self) -> Result { - let session = match self.session.take() { - Some(session) => session, + let session = match self.session.as_ref() { + Some(session) => session.clone(), None => Arc::new(Session::new( self.index_cache_size, self.metadata_cache_size, diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 8575029c077..605bac302c3 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2233,7 +2233,6 @@ mod tests { use lance_core::ROW_ID; use lance_datagen::{array, gen, RowCount}; use lance_file::version::LanceFileVersion; - use lance_io::object_store::ObjectStoreRegistry; use pretty_assertions::assert_eq; use rstest::rstest; use tempfile::tempdir; @@ -2822,10 +2821,10 @@ mod tests { config_upsert_values: None, }; - let registry = Arc::new(ObjectStoreRegistry::default()); - let new_dataset = Dataset::commit(test_uri, op, None, None, None, registry, false) - .await - .unwrap(); + let new_dataset = + Dataset::commit(test_uri, op, None, None, None, Default::default(), false) + .await + .unwrap(); assert_eq!(new_dataset.count_rows(None).await.unwrap(), dataset_rows); @@ -2931,10 +2930,10 @@ mod tests { config_upsert_values: None, }; - let registry = Arc::new(ObjectStoreRegistry::default()); - let dataset = Dataset::commit(test_uri, op, None, None, None, registry, false) - .await - .unwrap(); + let dataset = + Dataset::commit(test_uri, op, None, None, None, Default::default(), false) + .await + .unwrap(); // We only kept the first fragment of 40 rows assert_eq!( @@ -3152,7 +3151,6 @@ mod tests { // Rearrange schema so it's `s` then `i`. let schema = updater.schema().unwrap().clone().project(&["s", "i"])?; - let registry = Arc::new(ObjectStoreRegistry::default()); let dataset = Dataset::commit( test_uri, @@ -3163,7 +3161,7 @@ mod tests { Some(dataset.manifest.version), None, None, - registry, + Default::default(), false, ) .await?; @@ -3292,14 +3290,13 @@ mod tests { let op = Operation::Append { fragments: vec![frag], }; - let object_store_registry = Arc::new(ObjectStoreRegistry::default()); let dataset = Dataset::commit( &dataset.uri, op, Some(dataset.version().version), None, None, - object_store_registry, + Default::default(), false, ) .await diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 38dd7d21690..b80addd9f3f 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use lance_file::version::LanceFileVersion; -use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; +use lance_io::object_store::{ObjectStore, ObjectStoreParams}; use lance_table::{ format::{is_detached_version, DataStorageFormat}, io::commit::{CommitConfig, CommitHandler, ManifestNamingScheme}, @@ -36,7 +36,6 @@ pub struct CommitBuilder<'a> { storage_format: Option, commit_handler: Option>, store_params: Option, - object_store_registry: Arc, object_store: Option>, session: Option>, detached: bool, @@ -52,7 +51,6 @@ impl<'a> CommitBuilder<'a> { storage_format: None, commit_handler: None, store_params: None, - object_store_registry: Default::default(), object_store: None, session: None, detached: false, @@ -104,17 +102,6 @@ impl<'a> CommitBuilder<'a> { self } - /// Pass an object store registry to use. - /// - /// If an object store is passed, this registry will be ignored. - pub fn with_object_store_registry( - mut self, - object_store_registry: Arc, - ) -> Self { - self.object_store_registry = object_store_registry; - self - } - /// Pass a session to use for the dataset. /// /// If a session is not passed, but a dataset is used as the destination, @@ -162,6 +149,11 @@ impl<'a> CommitBuilder<'a> { } pub async fn execute(self, transaction: Transaction) -> Result { + let session = self + .session + .or_else(|| self.dest.dataset().map(|ds| ds.session.clone())) + .unwrap_or_default(); + let (object_store, base_path, commit_handler) = match &self.dest { WriteDestination::Dataset(dataset) => ( dataset.object_store.clone(), @@ -170,7 +162,7 @@ impl<'a> CommitBuilder<'a> { ), WriteDestination::Uri(uri) => { let (object_store, base_path) = ObjectStore::from_uri_and_params( - self.object_store_registry.clone(), + session.store_registry(), uri, &self.store_params.clone().unwrap_or_default(), ) @@ -190,11 +182,6 @@ impl<'a> CommitBuilder<'a> { } }; - let session = self - .session - .or_else(|| self.dest.dataset().map(|ds| ds.session.clone())) - .unwrap_or_default(); - let dest = match &self.dest { WriteDestination::Dataset(dataset) => WriteDestination::Dataset(dataset.clone()), WriteDestination::Uri(uri) => { @@ -422,10 +409,7 @@ pub struct BatchCommitResult { mod tests { use arrow::array::{Int32Array, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; - use lance_table::{ - format::{DataFile, Fragment}, - io::commit::ConditionalPutCommitHandler, - }; + use lance_table::format::{DataFile, Fragment}; use crate::dataset::{InsertBuilder, WriteParams}; @@ -465,7 +449,6 @@ mod tests { use crate::utils::test::IoTrackingStore; let session = Arc::new(Session::default()); - let store_registry = session.store_registry(); // Create new dataset let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "i", @@ -485,7 +468,6 @@ mod tests { let dataset = InsertBuilder::new("memory://test") .with_params(&WriteParams { store_params: Some(store_params.clone()), - commit_handler: Some(Arc::new(ConditionalPutCommitHandler)), session: Some(session.clone()), ..Default::default() }) @@ -533,8 +515,6 @@ mod tests { // Commit transaction with URI and session let new_ds = CommitBuilder::new("memory://test") .with_store_params(store_params.clone()) - .with_object_store_registry(store_registry.clone()) - .with_commit_handler(Arc::new(ConditionalPutCommitHandler)) .with_session(dataset.session.clone()) .execute(sample_transaction(1)) .await @@ -547,11 +527,12 @@ mod tests { assert_eq!(reads, 3); assert_eq!(writes, 2); - // Commit transaction with URI and no session + // Commit transaction with URI and new session. Re-use the store + // registry so we see the same store. + let new_session = Arc::new(Session::new(0, 0, session.store_registry())); let new_ds = CommitBuilder::new("memory://test") .with_store_params(store_params) - .with_commit_handler(Arc::new(ConditionalPutCommitHandler)) - .with_object_store_registry(store_registry.clone()) + .with_session(new_session) .execute(sample_transaction(1)) .await .unwrap(); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 5394158eec3..324376d2812 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -13,7 +13,7 @@ use lance_arrow::RecordBatchExt; use lance_core::datatypes::Schema; use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RowCount}; use lance_file::version::LanceFileVersion; -use lance_io::object_store::{ObjectStoreRegistry, WrappingObjectStore}; +use lance_io::object_store::WrappingObjectStore; use lance_table::format::Fragment; use object_store::path::Path; use object_store::{ @@ -118,14 +118,13 @@ impl TestDatasetGenerator { config_upsert_values: None, }; - let registry = Arc::new(ObjectStoreRegistry::default()); Dataset::commit( uri, operation, None, Default::default(), None, - registry, + Default::default(), false, ) .await From 3fe5311ae8983f4555fdf9c6c09da39f5ae03bf6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 16 Apr 2025 15:57:26 -0700 Subject: [PATCH 05/13] try this to check windows --- rust/lance/src/dataset.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 81002732125..6fd30a3ce7b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3162,6 +3162,8 @@ mod tests { .await .unwrap(); + dbg!(&dataset.base); + assert!(dataset.manifest_naming_scheme == ManifestNamingScheme::V2); assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); From 2e138e9f73552339faee233ebcda240db7a4ed2c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 16 Apr 2025 16:00:49 -0700 Subject: [PATCH 06/13] test more on mac and windows --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 85d21282fcf..79b91b116f7 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -173,7 +173,7 @@ jobs: - nightly defaults: run: - working-directory: ./rust/lance + working-directory: ./rust steps: - uses: actions/checkout@v4 - uses: Swatinem/rust-cache@v2 @@ -200,7 +200,7 @@ jobs: runs-on: windows-latest defaults: run: - working-directory: rust/lance + working-directory: rust steps: - uses: actions/checkout@v4 - uses: Swatinem/rust-cache@v2 From 69ca0aad99c57ae42cb1514cc048e67ca015ce3a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 16 Apr 2025 16:35:27 -0700 Subject: [PATCH 07/13] try again for windows --- rust/lance-io/src/object_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 8ac2b5c00a5..c96e241d184 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -1006,7 +1006,7 @@ mod tests { format!("{drive_letter}:\\test_folder\\test.lance"), ] { let (store, base) = ObjectStore::from_uri(uri).await.unwrap(); - let contents = read_from_store(store, &base.child("test_file")) + let contents = read_from_store(store.as_ref(), &base.child("test_file")) .await .unwrap(); assert_eq!(contents, "WINDOWS"); From bbd2f7da013f11e3cd97d76b5ea0cc3ccd5fdb08 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Apr 2025 09:10:36 -0700 Subject: [PATCH 08/13] fix deadlock --- .github/workflows/rust.yml | 2 ++ rust/lance-io/src/object_store.rs | 1 + rust/lance-io/src/object_store/providers.rs | 5 +++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 79b91b116f7..cdc3bbca3f4 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -215,6 +215,8 @@ jobs: shell: powershell - name: Build tests run: cargo test --locked --no-run + - name: Run lance-io tests first + run: cargo test --lib lance-io - name: Run tests run: cargo test - name: Check benchmarks diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index c96e241d184..70faccaa274 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -230,6 +230,7 @@ fn uri_to_url(uri: &str) -> Result { // On Windows, the drive is parsed as a scheme local_path_to_url(uri) } + Ok(url) if url.scheme() == "file" => local_path_to_url(uri), Ok(url) => Ok(url), Err(_) => local_path_to_url(uri), } diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 0a8a043d688..2e6c91baeb2 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -168,12 +168,13 @@ impl ObjectStoreRegistry { // Check if we have a cached store for this base path and params { - if let Some(store) = self + let maybe_store = self .active_stores .read() .expect("ObjectStoreRegistry lock poisoned") .get(&cache_key) - { + .cloned(); + if let Some(store) = maybe_store { if let Some(store) = store.upgrade() { return Ok(store); } else { From 1accfe2336b76cf900c1fabe3d5bea59509cfdcd Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Apr 2025 12:13:00 -0700 Subject: [PATCH 09/13] fix --- .github/workflows/rust.yml | 2 +- rust/lance-io/src/object_store.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index cdc3bbca3f4..4a367266b35 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -216,7 +216,7 @@ jobs: - name: Build tests run: cargo test --locked --no-run - name: Run lance-io tests first - run: cargo test --lib lance-io + run: cargo test -p lance-io - name: Run tests run: cargo test - name: Check benchmarks diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 70faccaa274..c96e241d184 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -230,7 +230,6 @@ fn uri_to_url(uri: &str) -> Result { // On Windows, the drive is parsed as a scheme local_path_to_url(uri) } - Ok(url) if url.scheme() == "file" => local_path_to_url(uri), Ok(url) => Ok(url), Err(_) => local_path_to_url(uri), } From 36388e58c84bceffc516bf35fd48c04e1084e4ce Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Apr 2025 13:29:15 -0700 Subject: [PATCH 10/13] fix path handling on windows --- .github/workflows/rust.yml | 2 -- rust/lance-io/src/object_store.rs | 58 ++++++++++++++++++++++++------- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4a367266b35..79b91b116f7 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -215,8 +215,6 @@ jobs: shell: powershell - name: Build tests run: cargo test --locked --no-run - - name: Run lance-io tests first - run: cargo test -p lance-io - name: Run tests run: cargo test - name: Check benchmarks diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index c96e241d184..bb249571a77 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -235,8 +235,8 @@ fn uri_to_url(uri: &str) -> Result { } } -fn local_path_to_url(str_path: &str) -> Result { - let expanded = tilde(str_path).to_string(); +fn expand_path(str_path: impl AsRef) -> Result { + let expanded = tilde(str_path.as_ref()).to_string(); let mut expanded_path = path_abs::PathAbs::new(expanded) .unwrap() @@ -249,6 +249,12 @@ fn local_path_to_url(str_path: &str) -> Result { } } + Ok(expanded_path) +} + +fn local_path_to_url(str_path: &str) -> Result { + let expanded_path = expand_path(str_path)?; + Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput { source: format!("Invalid table location: '{}'", str_path).into(), location: location!(), @@ -256,15 +262,21 @@ fn local_path_to_url(str_path: &str) -> Result { } fn extract_path(url: &Url) -> Path { - if url.scheme() == "memory" { - let mut output = String::new(); - if let Some(domain) = url.domain() { - output.push_str(domain); + match url.scheme() { + "memory" => { + let mut output = String::new(); + if let Some(domain) = url.domain() { + output.push_str(domain); + } + output.push_str(url.path()); + Path::from(output) } - output.push_str(url.path()); - Path::from(output) - } else { - Path::from(url.path()) + "file" | "file-object-store" => url + .to_file_path() + .ok() + .and_then(|p| Path::from_absolute_path(p).ok()) + .unwrap_or_else(|| Path::from(url.path())), + _ => Path::from(url.path()), } } @@ -402,14 +414,16 @@ impl ObjectStore { /// Create an [ObjectWriter] from local [std::path::Path] pub async fn create_local_writer(path: &std::path::Path) -> Result { let object_store = Self::local(); - let os_path = Path::from(path.to_str().unwrap()); + let absolute_path = expand_path(path.to_string_lossy())?; + let os_path = Path::from_absolute_path(absolute_path)?; object_store.create(&os_path).await } /// Open an [Reader] from local [std::path::Path] pub async fn open_local(path: &std::path::Path) -> Result> { let object_store = Self::local(); - let os_path = Path::from(path.to_str().unwrap()); + let absolute_path = expand_path(path.to_string_lossy())?; + let os_path = Path::from_absolute_path(absolute_path)?; object_store.open(&os_path).await } @@ -969,6 +983,26 @@ mod tests { } } + #[test] + #[cfg(windows)] + fn test_path_parsing_windows() { + let cases = [ + ( + "C:\\Users\\ADMINI~1\\AppData\\Local\\", + "C:/Users/ADMINI~1/AppData/Local", + ), + ( + "C:\\Users\\ADMINI~1\\AppData\\Local\\..\\", + "C:/Users/ADMINI~1/AppData", + ), + ]; + for (uri, expected_path) in cases { + let url = uri_to_url(uri).unwrap(); + let path = extract_path(&url); + assert_eq!(path.to_string(), expected_path); + } + } + #[tokio::test] #[cfg(windows)] async fn test_windows_paths() { From ff86ed1fe65ba3e3956d0b5aa369176c96e8ba2c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Apr 2025 13:39:13 -0700 Subject: [PATCH 11/13] clippy --- rust/lance/src/dataset.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6fd30a3ce7b..81002732125 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3162,8 +3162,6 @@ mod tests { .await .unwrap(); - dbg!(&dataset.base); - assert!(dataset.manifest_naming_scheme == ManifestNamingScheme::V2); assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2); From 5923818bfe762aab3b4c6c49ae4d6d4d1445949e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Apr 2025 18:16:28 -0700 Subject: [PATCH 12/13] pr feedback --- rust/lance-io/src/object_store.rs | 65 ++-------------- rust/lance-io/src/object_store/providers.rs | 75 +++++++------------ .../src/object_store/providers/aws.rs | 20 +++++ .../src/object_store/providers/azure.rs | 15 ++++ .../src/object_store/providers/gcp.rs | 15 ++++ .../src/object_store/providers/local.rs | 57 +++++++++++++- .../src/object_store/providers/memory.rs | 26 ++++++- rust/lance/src/session.rs | 14 +++- 8 files changed, 176 insertions(+), 111 deletions(-) diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index bb249571a77..e281b3d953d 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -16,6 +16,7 @@ use chrono::{DateTime, Utc}; use deepsize::DeepSizeOf; use futures::{future, stream::BoxStream, StreamExt, TryStreamExt}; use futures::{FutureExt, Stream}; +use lance_core::error::LanceOptionExt; use lance_core::utils::parse::str_is_truthy; use list_retry::ListRetryStream; #[cfg(feature = "aws")] @@ -261,25 +262,6 @@ fn local_path_to_url(str_path: &str) -> Result { }) } -fn extract_path(url: &Url) -> Path { - match url.scheme() { - "memory" => { - let mut output = String::new(); - if let Some(domain) = url.domain() { - output.push_str(domain); - } - output.push_str(url.path()); - Path::from(output) - } - "file" | "file-object-store" => url - .to_file_path() - .ok() - .and_then(|p| Path::from_absolute_path(p).ok()) - .unwrap_or_else(|| Path::from(url.path())), - _ => Path::from(url.path()), - } -} - impl ObjectStore { /// Parse from a string URI. /// @@ -321,8 +303,11 @@ impl ObjectStore { return Ok((Arc::new(store), path)); } let url = uri_to_url(uri)?; - let path = extract_path(&url); - let store = registry.get_store(url, params).await?; + let store = registry.get_store(url.clone(), params).await?; + // We know the scheme is valid if we got a store back. + let provider = registry.get_provider(url.scheme()).expect_ok()?; + let path = provider.extract_path(&url); + Ok((store, path)) } @@ -965,44 +950,6 @@ mod tests { assert_eq!(buf.as_bytes(), b"LOCAL"); } - #[test] - fn test_path_parsing() { - let cases = [ - ("file:///", ""), - ("file:///usr/local/bin", "usr/local/bin"), - ("memory://test/path", "test/path"), - ("s3://bucket/path", "path"), - ("s3+ddb://bucket/path", "path"), - ("gs://bucket/path", "path"), - ("az://account/path", "path"), - ]; - for (uri, expected_path) in cases { - let url = uri_to_url(uri).unwrap(); - let path = extract_path(&url); - assert_eq!(path.to_string(), expected_path); - } - } - - #[test] - #[cfg(windows)] - fn test_path_parsing_windows() { - let cases = [ - ( - "C:\\Users\\ADMINI~1\\AppData\\Local\\", - "C:/Users/ADMINI~1/AppData/Local", - ), - ( - "C:\\Users\\ADMINI~1\\AppData\\Local\\..\\", - "C:/Users/ADMINI~1/AppData", - ), - ]; - for (uri, expected_path) in cases { - let url = uri_to_url(uri).unwrap(); - let path = extract_path(&url); - assert_eq!(path.to_string(), expected_path); - } - } - #[tokio::test] #[cfg(windows)] async fn test_windows_paths() { diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 2e6c91baeb2..d07e4719ba2 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -6,12 +6,12 @@ use std::{ sync::{Arc, RwLock, Weak}, }; -use deepsize::DeepSizeOf; +use object_store::path::Path; use snafu::location; use url::Url; use super::{tracing::ObjectStoreTracingExt, ObjectStore, ObjectStoreParams}; -use lance_core::error::{Error, Result}; +use lance_core::error::{Error, LanceOptionExt, Result}; #[cfg(feature = "aws")] pub mod aws; @@ -25,6 +25,17 @@ pub mod memory; #[async_trait::async_trait] pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result; + + /// Extract the path relative to the base of the store. + /// + /// For example, in S3 the path is relative to the bucket. So a URL of + /// `s3://bucket/path/to/file` would return `path/to/file`. + /// + /// Meanwhile, for a file store, the path is relative to the filesystem root. + /// So a URL of `file:///path/to/file` would return `/path/to/file`. + fn extract_path(&self, url: &Url) -> Path { + Path::from(url.path()) + } } /// A registry of object store providers. @@ -55,30 +66,6 @@ pub struct ObjectStoreRegistry { active_stores: RwLock>>, } -impl DeepSizeOf for ObjectStoreRegistry { - fn deep_size_of_children(&self, _: &mut deepsize::Context) -> usize { - let mut size = 0; - let providers = self - .providers - .read() - .expect("ObjectStoreRegistry lock poisoned"); - for (key, _provider) in providers.iter() { - size += key.deep_size_of(); - // Ignore provider for simplicity - } - let active_stores = self - .active_stores - .read() - .expect("ObjectStoreRegistry lock poisoned"); - for ((base_path, _params), _store) in active_stores.iter() { - size += base_path.deep_size_of(); - // Ignore params for simplicity - size += std::mem::size_of::>(); - } - size - } -} - /// Convert a URL to a cache key. /// /// We truncate to the first path segment. This should capture @@ -171,7 +158,8 @@ impl ObjectStoreRegistry { let maybe_store = self .active_stores .read() - .expect("ObjectStoreRegistry lock poisoned") + .ok() + .expect_ok()? .get(&cache_key) .cloned(); if let Some(store) = maybe_store { @@ -194,23 +182,15 @@ impl ObjectStoreRegistry { } let scheme = base_path.scheme(); - let provider = self.get_provider(scheme).ok_or_else(|| { - let valid_schemes = self - .providers - .read() - .expect("ObjectStoreRegistry lock poisoned") - .keys() - .cloned() - .collect::>() - .join(", "); - Error::invalid_input( - format!( - "No object store provider found for scheme: '{}'\n valid_schemes: {}", - scheme, valid_schemes - ), - location!(), - ) - })?; + let Some(provider) = self.get_provider(scheme) else { + let mut message = format!("No object store provider found for scheme: '{}'", scheme); + if let Ok(providers) = self.providers.read() { + let valid_schemes = providers.keys().cloned().collect::>().join(", "); + message.push_str(&format!("\nValid schemes: {}", valid_schemes)); + } + + return Err(Error::invalid_input(message, location!())); + }; let mut store = provider.new_store(base_path, params).await?; store.inner = store.inner.traced(); @@ -223,10 +203,7 @@ impl ObjectStoreRegistry { { // Insert the store into the cache - let mut cache_lock = self - .active_stores - .write() - .expect("ObjectStoreRegistry lock poisoned"); + let mut cache_lock = self.active_stores.write().ok().expect_ok()?; cache_lock.insert(cache_key, Arc::downgrade(&store)); } @@ -268,7 +245,7 @@ impl Default for ObjectStoreRegistry { } impl ObjectStoreRegistry { - /// Add a new object store provider to the registry. This will be called + /// Add a new object store provider to the registry. The provider will be used /// in [`Self::get_store()`] when a URL is passed with a matching scheme. pub fn insert(&self, scheme: &str, provider: Arc) { self.providers diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index 6f910ce1034..d1dbbc7ac58 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -396,4 +396,24 @@ mod tests { // Not called yet assert!(mock_provider.called.load(Ordering::Relaxed)); } + + #[test] + fn test_s3_path_parsing() { + let provider = AwsStoreProvider; + + let cases = [ + ("s3://bucket/path/to/file", "path/to/file"), + ( + "s3+ddb://bucket/path/to/file?ddbTableName=test", + "path/to/file", + ), + ]; + + for (uri, expected_path) in cases { + let url = Url::parse(uri).unwrap(); + let path = provider.extract_path(&url); + let expected_path = Path::from(expected_path); + assert_eq!(path, expected_path); + } + } } diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index b7e29247613..16412919431 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -81,3 +81,18 @@ impl StorageOptions { .collect() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_azure_store_path() { + let provider = AzureBlobStoreProvider; + + let url = Url::parse("az://bucket/path/to/file").unwrap(); + let path = provider.extract_path(&url); + let expected_path = object_store::path::Path::from("path/to/file"); + assert_eq!(path, expected_path); + } +} diff --git a/rust/lance-io/src/object_store/providers/gcp.rs b/rust/lance-io/src/object_store/providers/gcp.rs index f1f0d5439ea..21f4ffc955f 100644 --- a/rust/lance-io/src/object_store/providers/gcp.rs +++ b/rust/lance-io/src/object_store/providers/gcp.rs @@ -96,3 +96,18 @@ impl StorageOptions { .collect() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gcs_store_path() { + let provider = GcsStoreProvider; + + let url = Url::parse("gs://bucket/path/to/file").unwrap(); + let path = provider.extract_path(&url); + let expected_path = object_store::path::Path::from("path/to/file"); + assert_eq!(path, expected_path); + } +} diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs index 0366094ef62..fa2b4474ffb 100644 --- a/rust/lance-io/src/object_store/providers/local.rs +++ b/rust/lance-io/src/object_store/providers/local.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use object_store::local::LocalFileSystem; +use object_store::{local::LocalFileSystem, path::Path}; use url::Url; use crate::object_store::{ @@ -31,4 +31,59 @@ impl ObjectStoreProvider for FileStoreProvider { download_retry_count, }) } + + fn extract_path(&self, url: &Url) -> object_store::path::Path { + url.to_file_path() + .ok() + .and_then(|p| Path::from_absolute_path(p).ok()) + .unwrap_or_else(|| Path::from(url.path())) + } +} + +#[cfg(test)] +mod tests { + use crate::object_store::uri_to_url; + + use super::*; + + #[test] + fn test_file_store_path() { + let provider = FileStoreProvider; + + let cases = [ + ("file:///", ""), + ("file:///usr/local/bin", "usr/local/bin"), + ("file-object-store:///path/to/file", "path/to/file"), + ("file:///path/to/foo/../bar", "path/to/bar"), + ]; + + for (uri, expected_path) in cases { + let url = uri_to_url(uri).unwrap(); + let path = provider.extract_path(&url); + assert_eq!(path.as_ref(), expected_path, "uri: '{}'", uri); + } + } + + #[test] + #[cfg(windows)] + fn test_file_store_path_windows() { + let provider = FileStoreProvider; + + let cases = [ + ( + "C:\\Users\\ADMINI~1\\AppData\\Local\\", + "C:/Users/ADMINI~1/AppData/Local", + ), + ( + "C:\\Users\\ADMINI~1\\AppData\\Local\\..\\", + "C:/Users/ADMINI~1/AppData", + ), + ]; + + for (uri, expected_path) in cases { + let url = uri_to_url(uri).unwrap(); + let path = provider.extract_path(&url); + assert_eq!(path.as_ref(), expected_path); + } + } } diff --git a/rust/lance-io/src/object_store/providers/memory.rs b/rust/lance-io/src/object_store/providers/memory.rs index 667058aef30..9c300e878a8 100644 --- a/rust/lance-io/src/object_store/providers/memory.rs +++ b/rust/lance-io/src/object_store/providers/memory.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use object_store::memory::InMemory; +use object_store::{memory::InMemory, path::Path}; use url::Url; use crate::object_store::{ @@ -31,4 +31,28 @@ impl ObjectStoreProvider for MemoryStoreProvider { download_retry_count, }) } + + fn extract_path(&self, url: &Url) -> Path { + let mut output = String::new(); + if let Some(domain) = url.domain() { + output.push_str(domain); + } + output.push_str(url.path()); + Path::from(output) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_memory_store_path() { + let provider = MemoryStoreProvider; + + let url = Url::parse("memory://path/to/file").unwrap(); + let path = provider.extract_path(&url); + let expected_path = Path::from("path/to/file"); + assert_eq!(path, expected_path); + } } diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index 2f751bdffdf..7b95eb6ca22 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -19,7 +19,7 @@ use self::index_extension::IndexExtension; pub mod index_extension; /// A user session tracks the runtime state. -#[derive(Clone, DeepSizeOf)] +#[derive(Clone)] pub struct Session { /// Cache for opened indices. pub(crate) index_cache: IndexCache, @@ -32,6 +32,18 @@ pub struct Session { store_registry: Arc, } +impl DeepSizeOf for Session { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + let mut size = 0; + size += self.index_cache.deep_size_of_children(context); + size += self.file_metadata_cache.deep_size_of_children(context); + for ext in self.index_extensions.values() { + size += ext.deep_size_of_children(context); + } + size + } +} + impl std::fmt::Debug for Session { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Session") From 33d14dd7dd205e102de7c28ebe6410c9cbeb859b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Apr 2025 19:14:10 -0700 Subject: [PATCH 13/13] fix --- rust/lance-index/src/scalar/btree.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index c9af009b964..b155e998e77 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1422,7 +1422,7 @@ mod tests { async fn test_null_ids() { let tmpdir = Arc::new(tempdir().unwrap()); let test_store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(tmpdir.path()).unwrap(), FileMetadataCache::no_cache(), )); @@ -1453,7 +1453,7 @@ mod tests { let remap_dir = Arc::new(tempdir().unwrap()); let remap_store = Arc::new(LanceIndexStore::new( - ObjectStore::local(), + Arc::new(ObjectStore::local()), Path::from_filesystem_path(remap_dir.path()).unwrap(), FileMetadataCache::no_cache(), ));