Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions vortex-cuda/src/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//! A CUDA-optimized flat layout that inlines small constant array buffers into layout metadata.

use std::any::Any;
use std::collections::BTreeSet;
use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -49,6 +48,7 @@ use vortex::layout::LayoutReader;
use vortex::layout::LayoutReaderRef;
use vortex::layout::LayoutRef;
use vortex::layout::LayoutStrategy;
use vortex::layout::RowSplits;
use vortex::layout::SplitRange;
use vortex::layout::VTable;
use vortex::layout::layouts::SharedArrayFuture;
Expand Down Expand Up @@ -286,10 +286,10 @@ impl LayoutReader for CudaFlatReader {
&self,
_field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
split_range.check_bounds(self.layout.row_count)?;
splits.insert(split_range.root_row_range().end);
splits.push(split_range.root_row_range().end);
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions vortex-file/src/v2/file_stats_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//! [`FileStatsLayoutReader`] short-circuits [`pruning_evaluation`](LayoutReader::pruning_evaluation)
//! by returning an all-false mask — avoiding all downstream I/O.

use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -32,6 +31,7 @@ use vortex_error::VortexResult;
use vortex_layout::ArrayFuture;
use vortex_layout::LayoutReader;
use vortex_layout::LayoutReaderRef;
use vortex_layout::RowSplits;
use vortex_layout::SplitRange;
use vortex_mask::Mask;
use vortex_session::VortexSession;
Expand Down Expand Up @@ -158,7 +158,7 @@ impl LayoutReader for FileStatsLayoutReader {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
self.child.register_splits(field_mask, split_range, splits)
}
Expand Down
30 changes: 22 additions & 8 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::collections::BTreeSet;
use std::future;
use std::ops::Range;
use std::sync::Arc;
use std::sync::LazyLock;

use futures::FutureExt;
use futures::TryStreamExt;
Expand All @@ -30,6 +30,7 @@ use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::layouts::chunked::ChunkedLayout;
use crate::reader::LayoutReader;
use crate::reader::RowSplits;
use crate::reader::SplitRange;
use crate::segments::SegmentSource;

Expand All @@ -42,6 +43,8 @@ pub struct ChunkedReader {
chunk_offsets: Vec<u64>,
}

static UNKNOWN: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("chunked-child"));

impl ChunkedReader {
pub fn new(
layout: ChunkedLayout,
Expand All @@ -58,9 +61,17 @@ impl ChunkedReader {
chunk_offsets[nchildren] = layout.row_count();

let dtypes = vec![layout.dtype.clone(); nchildren];
let names = (0..nchildren)
.map(|idx| Arc::from(format!("{name}.[{idx}]")))
.collect();

// format!() has non-marginal overhead for short queries like random
// access benchmarks
let names = if cfg!(debug_assertions) {
(0..nchildren)
.map(|idx| Arc::from(format!("{name}.[{idx}]")))
.collect()
} else {
vec![Arc::clone(&*UNKNOWN); nchildren]
};

let lazy_children = LazyReaderChildren::new(
Arc::clone(&layout.children),
dtypes,
Expand Down Expand Up @@ -170,15 +181,18 @@ impl LayoutReader for ChunkedReader {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
split_range.check_bounds(self.layout.row_count())?;

if split_range.is_empty() {
return Ok(());
}

for (chunk_idx, chunk_start, child_range, _) in self.ranges(split_range.row_range()) {
let iter = self.ranges(split_range.row_range());
splits.reserve(iter.size_hint().0);

for (chunk_idx, chunk_start, child_range, _) in iter {
let child = self.chunk_reader(chunk_idx)?;
let child_row_offset = split_range
.row_offset()
Expand All @@ -189,7 +203,7 @@ impl LayoutReader for ChunkedReader {
child.register_splits(field_mask, &child_split_range, splits)?;

// Register the split indicating the end of this chunk
splits.insert(
splits.push(
split_range
.row_offset()
.checked_add(chunk_start + child_split_range.row_range().end)
Expand Down Expand Up @@ -448,7 +462,7 @@ mod test {
.splits(reader.as_ref(), &row_range, &[FieldMask::All])
.unwrap();

assert_eq!(splits, expected.into_iter().collect());
assert_eq!(splits, expected.into_iter().collect::<Vec<_>>());
}

#[rstest]
Expand Down
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/dict/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::collections::BTreeSet;
use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -32,6 +31,7 @@ use vortex_utils::aliases::dash_map::DashMap;
use super::DictLayout;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::RowSplits;
use crate::SplitRange;
use crate::layouts::SharedArrayFuture;
use crate::segments::SegmentSource;
Expand Down Expand Up @@ -168,7 +168,7 @@ impl LayoutReader for DictReader {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
self.codes.register_splits(field_mask, split_range, splits)
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-layout/src/layouts/flat/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::collections::BTreeSet;
use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -24,6 +23,7 @@ use vortex_session::VortexSession;
use crate::layouts::SharedArrayFuture;
use crate::layouts::flat::FlatLayout;
use crate::reader::LayoutReader;
use crate::reader::RowSplits;
use crate::reader::SplitRange;
use crate::segments::SegmentSource;

Expand Down Expand Up @@ -105,10 +105,10 @@ impl LayoutReader for FlatReader {
&self,
_field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
split_range.check_bounds(self.layout.row_count)?;
splits.insert(split_range.root_row_range().end);
splits.push(split_range.root_row_range().end);
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/row_idx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

mod expr;

use std::collections::BTreeSet;
use std::fmt::Display;
use std::fmt::Formatter;
use std::ops::BitAnd;
Expand Down Expand Up @@ -43,6 +42,7 @@ use vortex_utils::aliases::dash_map::DashMap;

use crate::ArrayFuture;
use crate::LayoutReader;
use crate::RowSplits;
use crate::SplitRange;
use crate::layouts::partitioned::PartitionedExprEval;

Expand Down Expand Up @@ -172,7 +172,7 @@ impl LayoutReader for RowIdxLayoutReader {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
self.child.register_splits(field_mask, split_range, splits)
}
Expand Down
6 changes: 3 additions & 3 deletions vortex-layout/src/layouts/struct_/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;
use std::sync::OnceLock;
Expand Down Expand Up @@ -43,6 +42,7 @@ use crate::ArrayFuture;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::RowSplits;
use crate::SplitRange;
use crate::layouts::partitioned::PartitionedExprEval;
use crate::layouts::struct_::StructLayout;
Expand Down Expand Up @@ -240,10 +240,10 @@ impl LayoutReader for StructReader {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
// In the case of an empty struct, we need to register the end split.
splits.insert(split_range.root_row_range().end);
splits.push(split_range.root_row_range().end);

// Register splits for the validity child, if there is one
if let Some(validity_ref) = self.validity()? {
Expand Down
4 changes: 2 additions & 2 deletions vortex-layout/src/layouts/zoned/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::collections::BTreeSet;
use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
Expand All @@ -23,6 +22,7 @@ use vortex_session::VortexSession;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::RowSplits;
use crate::SplitRange;
use crate::layouts::zoned::ZonedLayout;
use crate::layouts::zoned::pruning::PruningState;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl LayoutReader for ZonedReader {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()> {
self.data_child()?
.register_splits(field_mask, split_range, splits)
Expand Down
30 changes: 28 additions & 2 deletions vortex-layout/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::any::Any;
use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -93,6 +92,33 @@ impl SplitRange {
}
}

/// A collection of row split points
pub struct RowSplits(Vec<u64>);

impl RowSplits {
/// Add row to splits
pub fn push(&mut self, row: u64) {
self.0.push(row);
}

/// Reserve space for "additional" elements
pub fn reserve(&mut self, additional: usize) {
Comment on lines +96 to +105
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put this in its own file?

self.0.reserve(additional);
}
Comment thread
myrrc marked this conversation as resolved.

/// Create a new RowSplits with preallocated "capacity"
pub(crate) fn new_capacity(capacity: usize) -> Self {
Self(Vec::with_capacity(capacity))
}

pub(crate) fn into_sorted_deduped(mut self) -> Vec<u64> {
self.0.sort_unstable();
self.0.dedup();
self.0.shrink_to_fit();
self.0
}
Comment thread
myrrc marked this conversation as resolved.
}

/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
/// evaluation operations.
pub trait LayoutReader: 'static + Send + Sync {
Expand All @@ -113,7 +139,7 @@ pub trait LayoutReader: 'static + Send + Sync {
&self,
field_mask: &[FieldMask],
split_range: &SplitRange,
splits: &mut BTreeSet<u64>,
splits: &mut RowSplits,
) -> VortexResult<()>;

/// Returns a mask where all false values are proven to be false in the given expression.
Expand Down
9 changes: 6 additions & 3 deletions vortex-layout/src/scan/repeated_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,19 @@ impl<A: 'static + Send> RepeatedScan<A> {
let row_range = intersect_ranges(row_range.as_ref(), selection_range);

let ranges = match &self.splits {
Splits::Natural(btree_set) => {
Splits::Natural(vec) => {
debug_assert!(vec.is_sorted());
let splits_iter = match row_range {
None => Either::Left(btree_set.iter().copied()),
None => Either::Left(vec.iter().copied()),
Some(range) => {
if range.is_empty() {
return Ok(Vec::new());
}
let lo = vec.partition_point(|&x| x < range.start);
let hi = vec.partition_point(|&x| x < range.end);
Either::Right(
iter::once(range.start)
.chain(btree_set.range(range.clone()).copied())
.chain(vec[lo..hi].iter().copied())
.chain(iter::once(range.end)),
)
}
Expand Down
Loading
Loading