Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vortex-file/src/read/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl State {
tracing::debug!(?event, "Received ReadEvent");
match event {
ReadEvent::Request(req) => {
if req.callback.is_closed() {
if req.is_closed() {
tracing::debug!(?req, "ReadRequest dropped before registration");
return;
}
Expand All @@ -145,7 +145,7 @@ impl State {
}
ReadEvent::Polled(req_id) => {
if let Some(req) = self.requests.remove(&req_id) {
if req.callback.is_closed() {
if req.is_closed() {
self.requests_by_offset.remove(&(req.offset, req_id));
tracing::debug!(?req, "ReadRequest dropped before poll");
} else {
Expand Down Expand Up @@ -192,7 +192,7 @@ impl State {
fn next_uncoalesced(&mut self) -> Option<ReadRequest> {
while let Some((req_id, req)) = self.polled_requests.pop_first() {
self.requests_by_offset.remove(&(req.offset, req_id));
if req.callback.is_closed() {
if req.is_closed() {
tracing::debug!("Dropping canceled request");
continue;
}
Expand Down Expand Up @@ -250,7 +250,7 @@ impl State {
.vortex_expect("Missing request in requests_by_offset");

// Skip any cancelled requests
if req.callback.is_closed() {
if req.is_closed() {
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
Expand Down
3 changes: 3 additions & 0 deletions vortex-file/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ mod driver;
mod request;

pub(crate) use driver::IoRequestStream;
#[cfg(test)]
pub(crate) use request::CoalescedRequest;
pub(crate) use request::IoRequest;
pub(crate) use request::ReadRequest;
pub(crate) use request::RequestId;
18 changes: 17 additions & 1 deletion vortex-file/src/read/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ impl IoRequest {
IoRequestInner::Coalesced(req) => req.resolve(result),
}
}

/// Returns true if no callback remains to receive the read result.
pub(crate) fn is_cancelled(&self) -> bool {
match &self.0 {
IoRequestInner::Single(req) => req.is_closed(),
IoRequestInner::Coalesced(req) => req.is_cancelled(),
}
}
}

// Testing functionality
Expand Down Expand Up @@ -100,12 +108,16 @@ impl Debug for ReadRequest {
.field("offset", &self.offset)
.field("length", &self.length)
.field("alignment", &self.alignment)
.field("is_closed", &self.callback.is_closed())
.field("is_closed", &self.is_closed())
.finish()
}
}

impl ReadRequest {
pub(crate) fn is_closed(&self) -> bool {
self.callback.is_closed()
}

pub(crate) fn resolve(self, result: VortexResult<BufferHandle>) {
if let Err(e) = self.callback.send(result) {
tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
Expand All @@ -132,6 +144,10 @@ impl Debug for CoalescedRequest {
}

impl CoalescedRequest {
pub(crate) fn is_cancelled(&self) -> bool {
self.requests.iter().all(ReadRequest::is_closed)
}

pub fn resolve(self, result: VortexResult<BufferHandle>) {
match result {
Ok(buffer) => {
Expand Down
154 changes: 148 additions & 6 deletions vortex-file/src/segments/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use vortex_metrics::MetricBuilder;
use vortex_metrics::MetricsRegistry;

use crate::SegmentSpec;
use crate::read::IoRequest;
use crate::read::IoRequestStream;
use crate::read::ReadRequest;
use crate::read::RequestId;
Expand Down Expand Up @@ -111,12 +112,7 @@ impl FileSegmentSource {
stream
.map(move |req| {
let reader = reader.clone();
async move {
let result = reader
.read_at(req.offset(), req.len(), req.alignment())
.await;
req.resolve(result);
}
drive_request(reader, req)
})
.buffer_unordered(concurrency)
.collect::<()>()
Expand All @@ -133,6 +129,22 @@ impl FileSegmentSource {
}
}

async fn drive_request<R: VortexReadAt>(reader: R, req: IoRequest) {
if req.is_cancelled() {
tracing::debug!(
offset = req.offset(),
length = req.len(),
"Skipping cancelled I/O request"
);
return;
}

let result = reader
.read_at(req.offset(), req.len(), req.alignment())
.await;
req.resolve(result);
}

impl SegmentSource for FileSegmentSource {
fn request(&self, id: SegmentId) -> SegmentFuture {
// We eagerly register the read request here assuming the behaviour of [`FileRead`], where
Expand Down Expand Up @@ -295,3 +307,133 @@ impl SegmentSource for BufferSegmentSource {
future::ready(Ok(BufferHandle::new_host(slice))).boxed()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use futures::future::BoxFuture;

use super::*;
use crate::read::CoalescedRequest;

#[derive(Clone, Default)]
struct CountingRead {
read_count: Arc<AtomicUsize>,
}

impl VortexReadAt for CountingRead {
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
async { Ok(1024) }.boxed()
}

fn concurrency(&self) -> usize {
1
}

fn read_at(
&self,
_offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
self.read_count.fetch_add(1, Ordering::Relaxed);
async move {
let buffer = ByteBuffer::copy_from(vec![0; length]).aligned(alignment);
Ok(BufferHandle::new_host(buffer))
}
.boxed()
}
}

#[tokio::test]
async fn drive_request_skips_cancelled_single_request() {
let reader = CountingRead::default();
let (callback, receiver) = oneshot::channel();
drop(receiver);

let req = IoRequest::new_single(ReadRequest {
id: 0,
offset: 0,
length: 16,
alignment: Alignment::none(),
callback,
});

drive_request(reader.clone(), req).await;

assert_eq!(reader.read_count.load(Ordering::Relaxed), 0);
}

#[tokio::test]
async fn drive_request_skips_fully_cancelled_coalesced_request() {
let reader = CountingRead::default();
let (callback1, receiver1) = oneshot::channel();
let (callback2, receiver2) = oneshot::channel();
drop(receiver1);
drop(receiver2);

let req = IoRequest::new_coalesced(CoalescedRequest {
range: 0..32,
alignment: Alignment::none(),
requests: vec![
ReadRequest {
id: 0,
offset: 0,
length: 16,
alignment: Alignment::none(),
callback: callback1,
},
ReadRequest {
id: 1,
offset: 16,
length: 16,
alignment: Alignment::none(),
callback: callback2,
},
],
});

drive_request(reader.clone(), req).await;

assert_eq!(reader.read_count.load(Ordering::Relaxed), 0);
}

#[tokio::test]
async fn drive_request_reads_coalesced_request_with_live_receiver() -> VortexResult<()> {
let reader = CountingRead::default();
let (callback1, receiver1) = oneshot::channel();
let (callback2, receiver2) = oneshot::channel();
drop(receiver1);

let req = IoRequest::new_coalesced(CoalescedRequest {
range: 0..32,
alignment: Alignment::none(),
requests: vec![
ReadRequest {
id: 0,
offset: 0,
length: 16,
alignment: Alignment::none(),
callback: callback1,
},
ReadRequest {
id: 1,
offset: 16,
length: 16,
alignment: Alignment::none(),
callback: callback2,
},
],
});

drive_request(reader.clone(), req).await;

let buffer = receiver2.await.expect("live receiver should resolve")?;
assert_eq!(buffer.len(), 16);
assert_eq!(reader.read_count.load(Ordering::Relaxed), 1);
Ok(())
}
}
Loading
Loading