From 7d02f452042a3f22282fa14e541d9a911febc68a Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Sat, 2 May 2026 10:24:41 +0100 Subject: [PATCH] some io stuff Signed-off-by: Adam Gutglick --- vortex-file/src/read/driver.rs | 8 +- vortex-file/src/read/mod.rs | 3 + vortex-file/src/read/request.rs | 18 +++- vortex-file/src/segments/source.rs | 154 ++++++++++++++++++++++++++-- vortex-file/src/tests.rs | 145 ++++++++++++++++++++++++++ vortex-layout/src/scan/tasks.rs | 10 +- vortex-layout/src/segments/cache.rs | 67 +++++++++++- 7 files changed, 388 insertions(+), 17 deletions(-) diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index d8fd638adc8..86cc15dced8 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -136,7 +136,7 @@ impl State { tracing::debug!(?event, "Received ReadEvent"); match event { ReadEvent::Request(req) => { - if req.callback.is_closed() { + if req.is_closed() { tracing::debug!(?req, "ReadRequest dropped before registration"); return; } @@ -145,7 +145,7 @@ impl State { } ReadEvent::Polled(req_id) => { if let Some(req) = self.requests.remove(&req_id) { - if req.callback.is_closed() { + if req.is_closed() { self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } else { @@ -192,7 +192,7 @@ impl State { fn next_uncoalesced(&mut self) -> Option { while let Some((req_id, req)) = self.polled_requests.pop_first() { self.requests_by_offset.remove(&(req.offset, req_id)); - if req.callback.is_closed() { + if req.is_closed() { tracing::debug!("Dropping canceled request"); continue; } @@ -250,7 +250,7 @@ impl State { .vortex_expect("Missing request in requests_by_offset"); // Skip any cancelled requests - if req.callback.is_closed() { + if req.is_closed() { if ids_to_remove.insert(req_id) { keys_to_remove.push((req_offset, req_id)); } diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index a812b81f63b..8421b260fd8 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -5,5 +5,8 @@ mod driver; mod request; pub(crate) use driver::IoRequestStream; +#[cfg(test)] +pub(crate) use request::CoalescedRequest; +pub(crate) use request::IoRequest; pub(crate) use request::ReadRequest; pub(crate) use request::RequestId; diff --git a/vortex-file/src/read/request.rs b/vortex-file/src/read/request.rs index 7caaa08d3d8..d4cafbae3b0 100644 --- a/vortex-file/src/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -57,6 +57,14 @@ impl IoRequest { IoRequestInner::Coalesced(req) => req.resolve(result), } } + + /// Returns true if no callback remains to receive the read result. + pub(crate) fn is_cancelled(&self) -> bool { + match &self.0 { + IoRequestInner::Single(req) => req.is_closed(), + IoRequestInner::Coalesced(req) => req.is_cancelled(), + } + } } // Testing functionality @@ -100,12 +108,16 @@ impl Debug for ReadRequest { .field("offset", &self.offset) .field("length", &self.length) .field("alignment", &self.alignment) - .field("is_closed", &self.callback.is_closed()) + .field("is_closed", &self.is_closed()) .finish() } } impl ReadRequest { + pub(crate) fn is_closed(&self) -> bool { + self.callback.is_closed() + } + pub(crate) fn resolve(self, result: VortexResult) { if let Err(e) = self.callback.send(result) { tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id); @@ -132,6 +144,10 @@ impl Debug for CoalescedRequest { } impl CoalescedRequest { + pub(crate) fn is_cancelled(&self) -> bool { + self.requests.iter().all(ReadRequest::is_closed) + } + pub fn resolve(self, result: VortexResult) { match result { Ok(buffer) => { diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..958d363887a 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -30,6 +30,7 @@ use vortex_metrics::MetricBuilder; use vortex_metrics::MetricsRegistry; use crate::SegmentSpec; +use crate::read::IoRequest; use crate::read::IoRequestStream; use crate::read::ReadRequest; use crate::read::RequestId; @@ -111,12 +112,7 @@ impl FileSegmentSource { stream .map(move |req| { let reader = reader.clone(); - async move { - let result = reader - .read_at(req.offset(), req.len(), req.alignment()) - .await; - req.resolve(result); - } + drive_request(reader, req) }) .buffer_unordered(concurrency) .collect::<()>() @@ -133,6 +129,22 @@ impl FileSegmentSource { } } +async fn drive_request(reader: R, req: IoRequest) { + if req.is_cancelled() { + tracing::debug!( + offset = req.offset(), + length = req.len(), + "Skipping cancelled I/O request" + ); + return; + } + + let result = reader + .read_at(req.offset(), req.len(), req.alignment()) + .await; + req.resolve(result); +} + impl SegmentSource for FileSegmentSource { fn request(&self, id: SegmentId) -> SegmentFuture { // We eagerly register the read request here assuming the behaviour of [`FileRead`], where @@ -295,3 +307,133 @@ impl SegmentSource for BufferSegmentSource { future::ready(Ok(BufferHandle::new_host(slice))).boxed() } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use futures::future::BoxFuture; + + use super::*; + use crate::read::CoalescedRequest; + + #[derive(Clone, Default)] + struct CountingRead { + read_count: Arc, + } + + impl VortexReadAt for CountingRead { + fn size(&self) -> BoxFuture<'static, VortexResult> { + async { Ok(1024) }.boxed() + } + + fn concurrency(&self) -> usize { + 1 + } + + fn read_at( + &self, + _offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.read_count.fetch_add(1, Ordering::Relaxed); + async move { + let buffer = ByteBuffer::copy_from(vec![0; length]).aligned(alignment); + Ok(BufferHandle::new_host(buffer)) + } + .boxed() + } + } + + #[tokio::test] + async fn drive_request_skips_cancelled_single_request() { + let reader = CountingRead::default(); + let (callback, receiver) = oneshot::channel(); + drop(receiver); + + let req = IoRequest::new_single(ReadRequest { + id: 0, + offset: 0, + length: 16, + alignment: Alignment::none(), + callback, + }); + + drive_request(reader.clone(), req).await; + + assert_eq!(reader.read_count.load(Ordering::Relaxed), 0); + } + + #[tokio::test] + async fn drive_request_skips_fully_cancelled_coalesced_request() { + let reader = CountingRead::default(); + let (callback1, receiver1) = oneshot::channel(); + let (callback2, receiver2) = oneshot::channel(); + drop(receiver1); + drop(receiver2); + + let req = IoRequest::new_coalesced(CoalescedRequest { + range: 0..32, + alignment: Alignment::none(), + requests: vec![ + ReadRequest { + id: 0, + offset: 0, + length: 16, + alignment: Alignment::none(), + callback: callback1, + }, + ReadRequest { + id: 1, + offset: 16, + length: 16, + alignment: Alignment::none(), + callback: callback2, + }, + ], + }); + + drive_request(reader.clone(), req).await; + + assert_eq!(reader.read_count.load(Ordering::Relaxed), 0); + } + + #[tokio::test] + async fn drive_request_reads_coalesced_request_with_live_receiver() -> VortexResult<()> { + let reader = CountingRead::default(); + let (callback1, receiver1) = oneshot::channel(); + let (callback2, receiver2) = oneshot::channel(); + drop(receiver1); + + let req = IoRequest::new_coalesced(CoalescedRequest { + range: 0..32, + alignment: Alignment::none(), + requests: vec![ + ReadRequest { + id: 0, + offset: 0, + length: 16, + alignment: Alignment::none(), + callback: callback1, + }, + ReadRequest { + id: 1, + offset: 16, + length: 16, + alignment: Alignment::none(), + callback: callback2, + }, + ], + }); + + drive_request(reader.clone(), req).await; + + let buffer = receiver2.await.expect("live receiver should resolve")?; + assert_eq!(buffer.len(), 16); + assert_eq!(reader.read_count.load(Ordering::Relaxed), 1); + Ok(()) + } +} diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 2ba10d96684..d587201399f 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -3,13 +3,16 @@ #![expect(clippy::cast_possible_truncation)] use std::iter; +use std::ops::Range; use std::sync::Arc; use std::sync::LazyLock; use bytes::Bytes; use futures::StreamExt; use futures::TryStreamExt; +use futures::future::BoxFuture; use futures::pin_mut; +use parking_lot::Mutex; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; @@ -27,6 +30,7 @@ use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::arrays::struct_::StructArrayExt; use vortex_array::assert_arrays_eq; +use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::DecimalDType; use vortex_array::dtype::Nullability; @@ -58,12 +62,17 @@ use vortex_array::stats::PRUNING_STATS; use vortex_array::stream::ArrayStreamAdapter; use vortex_array::stream::ArrayStreamExt; use vortex_array::validity::Validity; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_buffer::buffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSession; use vortex_layout::Layout; +use vortex_layout::LayoutChildType; use vortex_layout::scan::scan_builder::ScanBuilder; use vortex_layout::session::LayoutSession; use vortex_session::VortexSession; @@ -87,6 +96,34 @@ static SESSION: LazyLock = LazyLock::new(|| { session }); +#[derive(Clone)] +struct RecordingRead { + inner: R, + reads: Arc>>>, +} + +impl VortexReadAt for RecordingRead { + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.inner.size() + } + + fn concurrency(&self) -> usize { + self.inner.concurrency() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let end = + offset + u64::try_from(length).vortex_expect("read length must fit in u64 byte range"); + self.reads.lock().push(offset..end); + self.inner.read_at(offset, length, alignment) + } +} + #[tokio::test] async fn test_eof_values() { // this test exists as a reminder to think about whether we should increment the version @@ -515,6 +552,61 @@ async fn issue_5385_filter_casted_column() { ); } +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn all_false_filter_does_not_read_projected_column_segments() -> VortexResult<()> { + let row_count = 16_384; + let column_a = PrimitiveArray::from_iter(0..row_count).into_array(); + let column_b = + PrimitiveArray::from_iter((0..row_count).map(|value| value * 3 + 1)).into_array(); + let array = StructArray::from_fields(&[("a", column_a), ("b", column_b)])?.into_array(); + + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .with_file_statistics(vec![]) + .write(&mut buf, array.to_array_stream()) + .await?; + assert!(summary.footer().statistics().is_none()); + + let b_ranges = collect_field_segment_ranges( + summary.footer().layout().as_ref(), + "b", + summary.footer().segment_map(), + )?; + assert!(!b_ranges.is_empty(), "expected column b to have segments"); + + let reads = Arc::new(Mutex::new(Vec::new())); + let reader = RecordingRead { + inner: ByteBuffer::from(buf), + reads: Arc::clone(&reads), + }; + let file = SESSION + .open_options() + .with_footer(summary.footer().clone()) + .open_read(reader) + .await?; + + let result = file + .scan()? + .with_filter(gt(get_item("a", root()), lit(row_count))) + .with_projection(get_item("b", root())) + .into_array_stream()? + .read_all() + .await?; + + assert_eq!(result.len(), 0); + + let read_ranges = reads.lock().clone(); + assert!( + read_ranges + .iter() + .all(|read| b_ranges.iter().all(|b| !ranges_overlap(read, b))), + "projection column b was read despite an all-false filter: reads={read_ranges:?}, b_ranges={b_ranges:?}" + ); + Ok(()) +} + #[tokio::test] #[cfg_attr(miri, ignore)] async fn filter_string() { @@ -1801,6 +1893,59 @@ fn collect_segment_offsets_inner( } } +fn collect_field_segment_ranges( + layout: &dyn Layout, + field_name: &str, + segment_specs: &[SegmentSpec], +) -> VortexResult>> { + let mut result = Vec::new(); + collect_field_segment_ranges_inner(layout, field_name, segment_specs, &mut result)?; + Ok(result) +} + +fn collect_field_segment_ranges_inner( + layout: &dyn Layout, + field_name: &str, + segment_specs: &[SegmentSpec], + result: &mut Vec>, +) -> VortexResult<()> { + for idx in 0..layout.nchildren() { + let child = layout.child(idx)?; + match layout.child_type(idx) { + LayoutChildType::Field(name) if name == field_name => { + collect_segment_ranges(child.as_ref(), segment_specs, result)?; + } + _ => { + collect_field_segment_ranges_inner( + child.as_ref(), + field_name, + segment_specs, + result, + )?; + } + } + } + Ok(()) +} + +fn collect_segment_ranges( + layout: &dyn Layout, + segment_specs: &[SegmentSpec], + result: &mut Vec>, +) -> VortexResult<()> { + for seg_id in layout.segment_ids() { + result.push(segment_specs[*seg_id as usize].byte_range()); + } + for child in layout.children()? { + collect_segment_ranges(child.as_ref(), segment_specs, result)?; + } + Ok(()) +} + +fn ranges_overlap(left: &Range, right: &Range) -> bool { + left.start < right.end && right.start < left.end +} + /// Assert that all offsets in `before` are less than all offsets in `after`. fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) { if let (Some(&max_before), Some(&min_after)) = (before.iter().max(), after.iter().min()) { diff --git a/vortex-layout/src/scan/tasks.rs b/vortex-layout/src/scan/tasks.rs index dc0b489b1f2..d135b5a9a79 100644 --- a/vortex-layout/src/scan/tasks.rs +++ b/vortex-layout/src/scan/tasks.rs @@ -136,11 +136,9 @@ pub fn split_exec( } }; - // Step 4: execute the projection, only at the mask for rows which match the filter - let projection_future = - ctx.reader - .projection_evaluation(&row_range, &ctx.projection, filter_mask.clone())?; - + // Step 4: execute the projection only if the filter kept rows. + let reader = Arc::clone(&ctx.reader); + let projection = ctx.projection.clone(); let mapper = Arc::clone(&ctx.mapper); let array_fut = async move { let mask = filter_mask.await?; @@ -148,6 +146,8 @@ pub fn split_exec( return Ok(None); } + let projection_future = + reader.projection_evaluation(&row_range, &projection, MaskFuture::ready(mask))?; let array = projection_future.await?; mapper(array).map(Some) }; diff --git a/vortex-layout/src/segments/cache.rs b/vortex-layout/src/segments/cache.rs index 37675c19d2a..2970102fe71 100644 --- a/vortex-layout/src/segments/cache.rs +++ b/vortex-layout/src/segments/cache.rs @@ -138,13 +138,14 @@ impl SegmentCacheSourceAdapter { impl SegmentSource for SegmentCacheSourceAdapter { fn request(&self, id: SegmentId) -> SegmentFuture { let cache = Arc::clone(&self.cache); - let delegate = self.source.request(id); + let source = Arc::clone(&self.source); async move { if let Ok(Some(segment)) = cache.get(id).await { tracing::debug!("Resolved segment {} from cache", id); return Ok(BufferHandle::new_host(segment)); } + let delegate = source.request(id); let result = delegate.await?; // Cache only CPU buffers; device buffers are not cached. if let Some(buffer) = result.as_host_opt() @@ -157,3 +158,67 @@ impl SegmentSource for SegmentCacheSourceAdapter { .boxed() } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use vortex_buffer::ByteBuffer; + + use super::*; + use crate::segments::SegmentSink; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + + #[derive(Default, Clone)] + struct CountingSegmentSource { + segments: TestSegments, + request_count: Arc, + } + + impl SegmentSource for CountingSegmentSource { + fn request(&self, id: SegmentId) -> SegmentFuture { + self.request_count.fetch_add(1, Ordering::Relaxed); + self.segments.request(id) + } + } + + #[tokio::test] + async fn cache_hit_does_not_request_inner_source() -> VortexResult<()> { + let id = SegmentId::from(0); + let data = ByteBuffer::copy_from([1, 2, 3, 4]); + let cache = Arc::new(MokaSegmentCache::new(1024)); + cache.put(id, data.clone()).await?; + + let source = CountingSegmentSource::default(); + let adapter = SegmentCacheSourceAdapter::new(cache, Arc::new(source.clone())); + + let result = adapter.request(id).await?; + + assert_eq!(result.unwrap_host(), data); + assert_eq!(source.request_count.load(Ordering::Relaxed), 0); + Ok(()) + } + + #[tokio::test] + async fn cache_miss_requests_inner_source_and_stores_host_buffer() -> VortexResult<()> { + let data = ByteBuffer::copy_from([5, 6, 7, 8]); + let source = CountingSegmentSource::default(); + let id = source + .segments + .write(SequenceId::root().downgrade(), vec![data.clone()]) + .await?; + + let cache = Arc::new(MokaSegmentCache::new(1024)); + let cache_source: Arc = Arc::::clone(&cache); + let adapter = SegmentCacheSourceAdapter::new(cache_source, Arc::new(source.clone())); + + let result = adapter.request(id).await?; + + assert_eq!(result.unwrap_host(), data); + assert_eq!(source.request_count.load(Ordering::Relaxed), 1); + assert_eq!(cache.get(id).await?.as_ref(), Some(&data)); + Ok(()) + } +}