Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions vortex-duckdb/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ impl FileSystem for DuckDbFileSystem {
.boxed()
}

async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
// DuckDB's filesystem exposes no stat/exists call, so probe the path by opening it and
// reading its size. DuckDB does not distinguish "not found" from other open failures, so
// any open error is treated as a missing file (logged for diagnosability).
match self.open_read(path).await {
Ok(reader) => Ok(Some(FileListing {
path: path.to_string(),
size: Some(reader.size().await?),
})),
Err(e) => {
tracing::debug!("head({path}): treating open error as not-found: {e}");
Ok(None)
}
}
}

async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
let mut url = self.base_url.clone();
url.set_path(path);
Expand Down
2 changes: 1 addition & 1 deletion vortex-ffi/test/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ TEST_CASE("Creating datasources", "[datasource]") {
ds = vx_data_source_new(session, &opts, &error);
REQUIRE(ds == nullptr);
REQUIRE(error != nullptr);
REQUIRE_THAT(to_string(error), ContainsSubstring("No such file or directory"));
REQUIRE_THAT(to_string(error), ContainsSubstring("No files matched the glob pattern"));
vx_error_free(error);

opts.paths = "/tmp2/*.vortex";
Expand Down
4 changes: 4 additions & 0 deletions vortex-io/src/compat/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl<F: FileSystem> FileSystem for Compat<F> {
Compat::new(self.inner().list(prefix)).boxed()
}

async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
Compat::new(self.inner().head(path)).await
}

async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
let read_at = Compat::new(self.inner().open_read(path)).await?;
Ok(Arc::new(Compat::new(read_at)))
Expand Down
93 changes: 78 additions & 15 deletions vortex-io/src/filesystem/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use futures::stream::BoxStream;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
Expand All @@ -22,14 +23,18 @@ impl dyn FileSystem + '_ {
pub fn glob(&self, pattern: &str) -> VortexResult<BoxStream<'_, VortexResult<FileListing>>> {
validate_glob(pattern)?;

// If there are no glob characters, the pattern is an exact file path.
// Return it directly without listing the filesystem.
// If there are no glob characters, the pattern is an exact file path. `list` enumerates
// entries *under* a prefix on a path-segment basis and never yields the prefix itself, so
// listing an exact path would report an existing file as missing (and could surface prefix
// collisions such as `foo.vortex.backup` when the caller asked for `foo.vortex`). Use
// `head` to confirm the file exists and capture its size, yielding a single-element stream
// when it does and an empty stream when it does not.
if !pattern.contains(['*', '?', '[']) {
let listing = FileListing {
path: pattern.to_string(),
size: None,
};
return Ok(futures::stream::once(async { Ok(listing) }).boxed());
let pattern = pattern.to_string();
let stream = stream::once(async move { self.head(&pattern).await })
.try_filter_map(|listing| async move { Ok(listing) })
.boxed();
return Ok(stream);
}

let glob_pattern = glob::Pattern::new(pattern)
Expand Down Expand Up @@ -93,14 +98,41 @@ mod tests {
use crate::VortexReadAt;
use crate::filesystem::FileSystem;

/// A mock filesystem that panics if `list` is called.
/// A mock filesystem that resolves exact paths through [`head`](FileSystem::head) and
/// panics if [`list`](FileSystem::list) is called. This encodes the invariant the fix
/// depends on: the exact-path glob branch must never list, because an object store's `list`
/// does not return the exact path of a file.
#[derive(Debug)]
struct NoListFileSystem;
struct HeadFileSystem {
files: Vec<FileListing>,
}

impl HeadFileSystem {
fn new(files: &[(&str, u64)]) -> Self {
Self {
files: files
.iter()
.map(|&(path, size)| FileListing {
path: path.to_string(),
size: Some(size),
})
.collect(),
}
}
}

#[async_trait]
impl FileSystem for NoListFileSystem {
impl FileSystem for HeadFileSystem {
fn list(&self, _prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
vortex_panic!("list() should not be called for exact paths")
vortex_panic!("list() must not be called for an exact path; glob should use head()")
}

async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
Ok(self
.files
.iter()
.find(|listing| listing.path == path)
.cloned())
}

async fn open_read(&self, _path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
Expand All @@ -113,12 +145,43 @@ mod tests {
}

#[tokio::test]
async fn test_glob_exact_path_skips_list() -> VortexResult<()> {
let fs: &dyn FileSystem = &NoListFileSystem;
let results: Vec<FileListing> = fs.glob("data/file.vortex")?.try_collect().await?;
async fn test_glob_exact_path_existing_returns_listing_with_size() -> VortexResult<()> {
let fs = HeadFileSystem::new(&[("data/file.vortex", 1024)]);
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("data/file.vortex")?.try_collect().await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "data/file.vortex");
assert_eq!(results[0].size, None);
assert_eq!(
results[0].size,
Some(1024),
"exact-path glob should propagate the size reported by head"
);
Ok(())
}

#[tokio::test]
async fn test_glob_exact_path_missing_returns_empty_stream() -> VortexResult<()> {
let fs = HeadFileSystem::new(&[]);
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("data/missing.vortex")?.try_collect().await?;
assert!(
results.is_empty(),
"missing exact path should yield an empty stream"
);
Ok(())
}

#[tokio::test]
async fn test_glob_exact_path_ignores_prefix_siblings() -> VortexResult<()> {
// A real object store lists by prefix and would surface `foo.vortex.backup` when asked to
// list `foo.vortex`. Resolving the exact path via head sidesteps that: only the requested
// key is returned, and the panicking `list` proves the branch never enumerated.
let fs = HeadFileSystem::new(&[("foo.vortex", 10), ("foo.vortex.backup", 20)]);
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("foo.vortex")?.try_collect().await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "foo.vortex");
assert_eq!(results[0].size, Some(10));
Ok(())
}

Expand Down
21 changes: 21 additions & 0 deletions vortex-io/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ pub type FileSystemRef = Arc<dyn FileSystem>;
/// Implementations handle the details of a particular storage backend (local disk, S3, GCS, etc.)
/// while consumers work through this uniform interface.
///
/// # Paths
///
/// Path strings are *literal* object keys / file paths: the characters are used verbatim, with no
/// shell-style `~` expansion (`~` is a literal tilde, not the home directory) and no
/// percent-encoding or -decoding applied by this layer (`%20` is the three characters `%`, `2`,
/// `0`, not a space). A path produced by [`list`](FileSystem::list) or [`head`](FileSystem::head)
/// is the object's actual key, so it can be passed straight back to
/// [`open_read`](FileSystem::open_read) — including when it contains characters such as `~`, `%`,
/// `[`, `]`, or `#`.
///
/// # Future Work
///
/// An `open_write` method will be added once [`VortexWrite`](crate::VortexWrite) is
Expand All @@ -51,6 +61,17 @@ pub trait FileSystem: Debug + Send + Sync {
/// callers should sort if deterministic ordering is required.
fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>>;

/// Fetch metadata for the file at the exact `path`, if it exists.
///
/// Unlike [`list`](FileSystem::list), which enumerates files *under* a prefix on a
/// path-segment basis and never yields the prefix itself, `head` looks up the object at
/// exactly `path`. It is the correct primitive for confirming that a single known file
/// exists and reading its size.
///
/// Returns `Ok(Some(_))` with the file's [`FileListing`] when it exists, `Ok(None)` when no
/// file exists at `path`, and `Err(_)` for any other failure (I/O or permission errors, etc.).
async fn head(&self, path: &str) -> VortexResult<Option<FileListing>>;

/// Open a file for reading at the given path.
async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>>;

Expand Down
12 changes: 12 additions & 0 deletions vortex-io/src/filesystem/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ impl FileSystem for PrefixFileSystem {
.boxed()
}

async fn head(&self, path: &str) -> VortexResult<Option<FileListing>> {
let full_path = format!("{}{}", self.prefix, path.trim_start_matches('/'));
Ok(self.inner.head(&full_path).await?.map(|mut listing| {
listing.path = listing
.path
.strip_prefix(&self.prefix)
.unwrap_or(&listing.path)
.to_string();
listing
}))
}

async fn open_read(&self, path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
self.inner
.open_read(&format!("{}{}", self.prefix, path.trim_start_matches('/')))
Expand Down
Loading
Loading