Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions vortex-file/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ pub fn vortex_file::Footer::into_serializer(self) -> vortex_file::FooterSerializ

pub fn vortex_file::Footer::layout(&self) -> &vortex_layout::layout::LayoutRef

pub fn vortex_file::Footer::metadata_segment(&self, &str) -> core::option::Option<&vortex_buffer::ByteBuffer>

pub fn vortex_file::Footer::metadata_segments(&self) -> impl core::iter::traits::iterator::Iterator<Item = (&str, &vortex_buffer::ByteBuffer)>

pub fn vortex_file::Footer::row_count(&self) -> u64

pub fn vortex_file::Footer::segment_map(&self) -> &alloc::sync::Arc<[vortex_file::SegmentSpec]>
Expand All @@ -206,10 +210,14 @@ pub fn vortex_file::FooterDeserializer::buffer(&self) -> &vortex_buffer::ByteBuf

pub fn vortex_file::FooterDeserializer::deserialize(&mut self) -> vortex_error::VortexResult<vortex_file::DeserializeStep>

pub fn vortex_file::FooterDeserializer::include_metadata(self) -> Self

pub fn vortex_file::FooterDeserializer::prefix_data(&mut self, vortex_buffer::ByteBuffer)

pub fn vortex_file::FooterDeserializer::with_dtype(self, vortex_array::dtype::DType) -> Self

pub fn vortex_file::FooterDeserializer::with_include_metadata(self, bool) -> Self

pub fn vortex_file::FooterDeserializer::with_size(self, u64) -> Self

pub fn vortex_file::FooterDeserializer::with_some_dtype(self, core::option::Option<vortex_array::dtype::DType>) -> Self
Expand Down Expand Up @@ -276,6 +284,10 @@ pub fn vortex_file::VortexFile::footer(&self) -> &vortex_file::Footer

pub fn vortex_file::VortexFile::layout_reader(&self) -> vortex_error::VortexResult<alloc::sync::Arc<dyn vortex_layout::reader::LayoutReader>>

pub fn vortex_file::VortexFile::metadata_segment(&self, &str) -> core::option::Option<&vortex_buffer::ByteBuffer>

pub fn vortex_file::VortexFile::metadata_segments(&self) -> impl core::iter::traits::iterator::Iterator<Item = (&str, &vortex_buffer::ByteBuffer)>

pub fn vortex_file::VortexFile::row_count(&self) -> u64

pub fn vortex_file::VortexFile::scan(&self) -> vortex_error::VortexResult<vortex_layout::scan::scan_builder::ScanBuilder<vortex_array::array::erased::ArrayRef>>
Expand All @@ -292,6 +304,8 @@ pub struct vortex_file::VortexOpenOptions

impl vortex_file::VortexOpenOptions

pub fn vortex_file::VortexOpenOptions::include_metadata(self) -> Self

pub async fn vortex_file::VortexOpenOptions::open(self, alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>) -> vortex_error::VortexResult<vortex_file::VortexFile>

pub fn vortex_file::VortexOpenOptions::open_buffer<B: core::convert::Into<vortex_buffer::ByteBuffer>>(self, B) -> vortex_error::VortexResult<vortex_file::VortexFile>
Expand All @@ -306,6 +320,8 @@ pub fn vortex_file::VortexOpenOptions::with_file_size(self, u64) -> Self

pub fn vortex_file::VortexOpenOptions::with_footer(self, vortex_file::Footer) -> Self

pub fn vortex_file::VortexOpenOptions::with_include_metadata(self, bool) -> Self

pub fn vortex_file::VortexOpenOptions::with_initial_read_size(self, usize) -> Self

pub fn vortex_file::VortexOpenOptions::with_labels(self, alloc::vec::Vec<vortex_metrics::Label>) -> Self
Expand Down Expand Up @@ -338,6 +354,10 @@ pub fn vortex_file::VortexWriteOptions::new(vortex_session::VortexSession) -> Se

pub fn vortex_file::VortexWriteOptions::with_file_statistics(self, alloc::vec::Vec<vortex_array::expr::stats::Stat>) -> Self

pub fn vortex_file::VortexWriteOptions::with_metadata_segment(self, impl core::convert::Into<alloc::string::String>, impl core::convert::Into<vortex_buffer::ByteBuffer>) -> Self

pub fn vortex_file::VortexWriteOptions::with_metadata_segments<I, K, B>(self, I) -> Self where I: core::iter::traits::collect::IntoIterator<Item = (K, B)>, K: core::convert::Into<alloc::string::String>, B: core::convert::Into<vortex_buffer::ByteBuffer>

pub fn vortex_file::VortexWriteOptions::with_strategy(self, alloc::sync::Arc<dyn vortex_layout::strategy::LayoutStrategy>) -> Self

pub struct vortex_file::WriteStrategyBuilder
Expand Down
11 changes: 11 additions & 0 deletions vortex-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use vortex_array::dtype::FieldPathSet;
use vortex_array::expr::Expression;
use vortex_array::expr::pruning::checked_pruning_expr;
use vortex_array::scalar_fn::internal::row_count::substitute_row_count;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexResult;
use vortex_layout::LayoutReader;
use vortex_layout::scan::layout::LayoutReaderDataSource;
Expand Down Expand Up @@ -76,6 +77,16 @@ impl VortexFile {
self.footer.statistics()
}

/// Returns the user-defined metadata segments loaded for this file.
pub fn metadata_segments(&self) -> impl Iterator<Item = (&str, &ByteBuffer)> {
self.footer.metadata_segments()
}

/// Returns the loaded user-defined metadata segment for the given key.
pub fn metadata_segment(&self, key: &str) -> Option<&ByteBuffer> {
self.footer.metadata_segment(key)
}

/// Create a new segment source for reading from the file.
///
/// This may spawn a background I/O driver that will exit when the returned segment source
Expand Down
87 changes: 82 additions & 5 deletions vortex-file/src/footer/deserializer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use flatbuffers::root;
use vortex_array::dtype::DType;
use vortex_buffer::ByteBuffer;
Expand All @@ -12,13 +14,15 @@ use vortex_error::vortex_err;
use vortex_flatbuffers::FlatBuffer;
use vortex_flatbuffers::ReadFlatBuffer;
use vortex_session::VortexSession;
use vortex_utils::aliases::hash_map::HashMap;

use crate::EOF_SIZE;
use crate::Footer;
use crate::MAGIC_BYTES;
use crate::VERSION;
use crate::footer::FileStatistics;
use crate::footer::postscript::Postscript;
use crate::footer::postscript::PostscriptMetadata;
use crate::footer::postscript::PostscriptSegment;

/// Deserialize a footer from the end of a Vortex file or created from a
Expand All @@ -37,6 +41,8 @@ pub struct FooterDeserializer {

// The file size, possibly provided externally.
file_size: Option<u64>,
// Whether to include user-defined metadata segments in the footer read.
include_metadata: bool,
// The postscript, once we've parsed it.
postscript: Option<Postscript>,
}
Expand All @@ -48,6 +54,7 @@ impl FooterDeserializer {
session,
dtype: None,
file_size: None,
include_metadata: false,
postscript: None,
}
}
Expand All @@ -72,6 +79,18 @@ impl FooterDeserializer {
self
}

/// Include user-defined metadata segments in footer deserialization.
pub fn include_metadata(mut self) -> Self {
self.include_metadata = true;
self
}

/// Whether to include user-defined metadata segments in footer deserialization.
pub fn with_include_metadata(mut self, include_metadata: bool) -> Self {
self.include_metadata = include_metadata;
self
}

/// Prefix more data to the existing buffer when requested by the deserializer.
pub fn prefix_data(&mut self, more_data: ByteBuffer) {
let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
Expand Down Expand Up @@ -119,6 +138,11 @@ impl FooterDeserializer {
if let Some(stats_segment) = &postscript.statistics {
read_more_offset = read_more_offset.min(stats_segment.offset);
}
if self.include_metadata {
for metadata in &postscript.metadata {
read_more_offset = read_more_offset.min(metadata.segment.offset);
}
}
read_more_offset = read_more_offset.min(postscript.layout.offset);
read_more_offset = read_more_offset.min(postscript.footer.offset);

Expand Down Expand Up @@ -151,14 +175,27 @@ impl FooterDeserializer {
)
})
.transpose()?;
let metadata = if self.include_metadata {
Arc::new(
postscript
.metadata
.iter()
.map(|metadata| {
self.parse_metadata_segment(initial_offset, &self.buffer, metadata)
})
.collect::<VortexResult<HashMap<_, _>>>()?,
)
} else {
Arc::new(HashMap::default())
};

Ok(DeserializeStep::Done(self.parse_footer(
initial_offset,
&self.buffer,
&postscript.footer,
&postscript.layout,
postscript,
dtype,
file_stats,
metadata,
)?))
}

Expand Down Expand Up @@ -238,27 +275,67 @@ impl FooterDeserializer {
FileStatistics::from_flatbuffer(&fb, dtype, session)
}

/// Parse a user-defined metadata segment from the initial read buffer.
fn parse_metadata_segment(
&self,
initial_offset: u64,
initial_read: &ByteBuffer,
metadata: &PostscriptMetadata,
) -> VortexResult<(String, ByteBuffer)> {
let offset = usize::try_from(metadata.segment.offset - initial_offset)?;
let length = metadata.segment.length as usize;
let end = offset
.checked_add(length)
.ok_or_else(|| vortex_err!("Metadata segment range overflowed usize"))?;

if end > initial_read.len() {
vortex_bail!(
"Metadata segment {} range {}..{} out of bounds for initial read of length {}",
metadata.key,
offset,
end,
initial_read.len()
);
}

Ok((
metadata.key.clone(),
initial_read
.slice_unaligned(offset..end)
.aligned(metadata.segment.alignment),
))
}

/// Parse the rest of the footer from the initial read.
fn parse_footer(
&self,
initial_offset: u64,
initial_read: &[u8],
footer_segment: &PostscriptSegment,
layout_segment: &PostscriptSegment,
postscript: &Postscript,
dtype: DType,
file_stats: Option<FileStatistics>,
metadata: Arc<HashMap<String, ByteBuffer>>,
) -> VortexResult<Footer> {
let footer_segment = &postscript.footer;
let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
let footer_bytes = FlatBuffer::copy_from(
&initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
);

let layout_segment = &postscript.layout;
let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
let layout_bytes = FlatBuffer::copy_from(
&initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
);

Footer::from_flatbuffer(footer_bytes, layout_bytes, dtype, file_stats, &self.session)
Footer::from_flatbuffer(
footer_bytes,
layout_bytes,
dtype,
file_stats,
metadata,
&self.session,
)
}
}

Expand Down
32 changes: 32 additions & 0 deletions vortex-file/src/footer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,29 @@ use vortex_layout::layout_from_flatbuffer_with_options;
use vortex_layout::session::LayoutSessionExt;
use vortex_session::VortexSession;
use vortex_session::registry::ReadContext;
use vortex_utils::aliases::hash_map::HashMap;

/// Maximum number of user-defined metadata segments stored in the postscript.
///
/// The postscript has a fixed 64 KiB budget and must remain small enough for readers to discover
/// the footer and required segment locations from the initial read. Sixteen entries keeps metadata
/// bookkeeping well under 2 KiB for typical key lengths while leaving almost all of the postscript
/// budget available for the required file segments.
pub(crate) const MAX_METADATA_SEGMENTS: usize = 16;

/// Maximum length, in UTF-8 bytes, of a user-defined metadata key.
///
/// Metadata keys are stored directly in the fixed-size postscript. Keeping keys short prevents
/// user-defined labels from consuming a disproportionate share of the postscript budget.
pub(crate) const MAX_METADATA_KEY_BYTES: usize = 32;

/// Captures the layout information of a Vortex file.
#[derive(Debug, Clone)]
pub struct Footer {
root_layout: LayoutRef,
segments: Arc<[SegmentSpec]>,
statistics: Option<FileStatistics>,
metadata: Arc<HashMap<String, ByteBuffer>>,
// The specific arrays used within the file, in the order they were registered.
array_read_ctx: ReadContext,
// The approximate size of the footer in bytes, used for caching and memory management.
Expand All @@ -56,12 +72,14 @@ impl Footer {
root_layout: LayoutRef,
segments: Arc<[SegmentSpec]>,
statistics: Option<FileStatistics>,
metadata: Arc<HashMap<String, ByteBuffer>>,
array_read_ctx: ReadContext,
) -> Self {
Self {
root_layout,
segments,
statistics,
metadata,
array_read_ctx,
approx_byte_size: None,
}
Expand All @@ -78,6 +96,7 @@ impl Footer {
layout_bytes: FlatBuffer,
dtype: DType,
statistics: Option<FileStatistics>,
metadata: Arc<HashMap<String, ByteBuffer>>,
session: &VortexSession,
) -> VortexResult<Self> {
let approx_byte_size = footer_bytes.len() + layout_bytes.len();
Expand Down Expand Up @@ -126,6 +145,7 @@ impl Footer {
root_layout,
segments,
statistics,
metadata,
array_read_ctx,
approx_byte_size: Some(approx_byte_size),
})
Expand All @@ -146,6 +166,18 @@ impl Footer {
self.statistics.as_ref()
}

/// Returns the user-defined metadata segments loaded for this file.
pub fn metadata_segments(&self) -> impl Iterator<Item = (&str, &ByteBuffer)> {
self.metadata
.iter()
.map(|(key, metadata)| (key.as_str(), metadata))
}

/// Returns the loaded user-defined metadata segment for the given key.
pub fn metadata_segment(&self, key: &str) -> Option<&ByteBuffer> {
self.metadata.get(key)
}

/// Returns the [`DType`] of the file.
pub fn dtype(&self) -> &DType {
self.root_layout.dtype()
Expand Down
Loading
Loading